diff --git a/_docs/03_implementation/batch_87_report.md b/_docs/03_implementation/batch_87_report.md new file mode 100644 index 0000000..f534be0 --- /dev/null +++ b/_docs/03_implementation/batch_87_report.md @@ -0,0 +1,186 @@ +# Batch 87 — AZ-436 + AZ-437 + AZ-438 + AZ-439 (Security NFTs) + +**Tracker**: AZ-436, AZ-437, AZ-438, AZ-439 +**Tasks**: 4 tasks / 16 complexity points (5 + 3 + 3 + 5) +**Date**: 2026-05-17 +**Verdict**: PASS_WITH_WARNINGS +**Review**: `_docs/03_implementation/reviews/batch_87_review.md` +**Cumulative review**: `_docs/03_implementation/reviews/cumulative_review_batches_85_87.md` + +## Scope + +- **AZ-436 / NFT-SEC-01 (AC-NEW-9)** — N synthetic flights with 1-5 % poisoned tiles; aggregate `false_trust_count ≤ N × 1e-6`; zero-tolerance default. Canonical (ardupilot, okvis2) at N=1000 by default; `E2E_NFT_SEC_01_RELEASE_GATE=1` unlocks full N=10000 × matrix. +- **AZ-437 / NFT-SEC-02 + NFT-SEC-05 (AC-NEW-10)** — Two scenarios sharing the egress-observation pattern: NFT-SEC-02 verifies 0 packets to non-`e2e-net` over 5-min Derkachi replay; NFT-SEC-05 verifies DNS-blackhole sidecar absorbs probes + UDP-53 silence. +- **AZ-438 / NFT-SEC-03 (AC-NEW-11)** — AP-only; three sub-cases (unsigned / wrong-key / replayed-tlog) each yield `BAD_SIGNATURE` STATUSTEXT ≤500 ms + no position drift. iNav SKIPs. +- **AZ-439 / NFT-SEC-04 (RESTRICT-CVE-1)** — Probe scenario (always-run): cve-jpeg-fixture does not crash SUT + records deterministic decode-success or frame-decode-error. ASan-fuzz scenario (release-gate `E2E_NFT_SEC_04_RELEASE_GATE=1`): ≥4 h, 0 ASan findings, ≥1000 corpus inputs (informational). + +## Files + +### Created (13 files) + +- `e2e/runner/helpers/cache_poisoning_evaluator.py` — N-flight aggregate verdict + per-flight poison-ratio + defense-layer-coverage + rejection-reason vocabulary checks. +- `e2e/runner/helpers/egress_observer.py` — before/after counter snapshots, `NoEgressReport` + `DnsBlackholeReport` + 5-outcome DNS lookup classifier. +- `e2e/runner/helpers/mavlink_signing_evaluator.py` — per-sub-case rejection STATUSTEXT match (BAD_SIGNATURE + documented variants) + position-drift verdict + AC-1 iNav-SKIP companion logic. +- `e2e/runner/helpers/cve_probe_evaluator.py` — FDR-survival + deterministic-outcome classifier; rejects silent drops as defense-bypass. +- `e2e/runner/helpers/asan_fuzz_evaluator.py` — line-level ASan-finding classifier (8 categories + OTHER_FINDING fallback) + duration gate + corpus-floor informational check. +- `e2e/tests/security/test_nft_sec_01_cache_poisoning.py` — NFT-SEC-01 scenario. +- `e2e/tests/security/test_nft_sec_02_no_egress.py` — NFT-SEC-02 scenario. +- `e2e/tests/security/test_nft_sec_03_mavlink_signing.py` — NFT-SEC-03 scenario (AP-only). +- `e2e/tests/security/test_nft_sec_04_opencv_cve.py` — NFT-SEC-04 probe scenario (always-run). +- `e2e/tests/security/test_nft_sec_04_asan_fuzz.py` — NFT-SEC-04 fuzz scenario (release-gate). +- `e2e/tests/security/test_nft_sec_05_dns_blackhole.py` — NFT-SEC-05 scenario. +- `e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py` — 16 unit tests. +- `e2e/_unit_tests/helpers/test_egress_observer.py` — 14 unit tests. +- `e2e/_unit_tests/helpers/test_mavlink_signing_evaluator.py` — 18 unit tests. +- `e2e/_unit_tests/helpers/test_cve_probe_evaluator.py` — 11 unit tests. +- `e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py` — 16 unit tests. +- `_docs/03_implementation/reviews/batch_87_review.md` — per-batch code review. +- `_docs/03_implementation/reviews/cumulative_review_batches_85_87.md` — K=3 window cumulative review. +- `_docs/LESSONS.md` — agent-behaviour lesson (Jira transition IDs). + +### Modified + +- `e2e/_unit_tests/test_directory_layout.py` — registered 11 new paths (5 helpers + 6 scenarios). + +## Test Results + +Per-batch unit tests: + +``` +$ pytest e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py \ + e2e/_unit_tests/helpers/test_egress_observer.py \ + e2e/_unit_tests/helpers/test_mavlink_signing_evaluator.py \ + e2e/_unit_tests/helpers/test_cve_probe_evaluator.py \ + e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py \ + e2e/_unit_tests/test_directory_layout.py +================ 215 passed in 0.25s ================ +``` + +Full unit-test suite (regression check, run from workspace root): + +``` +$ pytest e2e/_unit_tests/ +================ 1151 passed in 137.86s (0:02:17) ================ +``` + +Scenario collection (36 cases — 6 scenarios × 6 (fc_adapter × vio_strategy) variants): + +``` +$ pytest e2e/tests/security/ --collect-only -p no:csv --evidence-out=/tmp/e2e-test-evidence +collected 36 items +``` + +Scenario smoke (all 36 skip cleanly with diagnostic messages): + +``` +36 skipped in 0.11s +``` + +Skip breakdown: +- 12 skip-on-`vins_mono` (conftest research-build-only rule from D-C1-1-SUB-A). +- 5 skip-on-canonical-only for NFT-SEC-01 (AC-4 default + the matching `vins_mono`-skipped vins variants). +- 6 skip-on-iNav for NFT-SEC-03 (AC-1). +- 4 skip-on-release-gate for NFT-SEC-04 ASan-fuzz. +- 9 skip-on-`sitl_replay_ready=False` (no `E2E_SITL_REPLAY_DIR` locally). + +## AC Verification + +### AZ-436 / NFT-SEC-01 + +| AC | Coverage | +|----|----------| +| AC-1 N flights complete | `len(flights) < NFT_SEC_01_CI_MIN_FLIGHTS` gate + scenario `flight_count` NFR record | +| AC-2 poisoned-tile production | `passes_ratio` + `passes_layer_coverage` + `passes_rejection_reason_vocabulary` (3 sub-asserts) | +| AC-3 false-trust budget | `passes_budget` (zero-tolerance default — `count == 0`) + scenario `total_false_trust` / `budget` NFR records | +| AC-4 parameterization | canonical-only default + `E2E_NFT_SEC_01_RELEASE_GATE=1` for full matrix | + +### AZ-437 / NFT-SEC-02 + NFT-SEC-05 + +| AC | Coverage | +|----|----------| +| NFT-SEC-02 AC-1 egress counter == 0 | `NoEgressReport.passes` + scenario AC-1 assert | +| NFT-SEC-05 AC-2 sidecar healthy | `DnsBlackholeReport.sidecar_healthy` + scenario AC-2 assert | +| NFT-SEC-05 AC-3a lookup fails | `passes_lookup` (NXDOMAIN / timeout / no-servers / other-failure) + scenario AC-3a assert | +| NFT-SEC-05 AC-3b UDP-53 silent | `passes_udp_silence` + scenario AC-3b assert | +| AC-4 parameterization | conftest matrix | + +### AZ-438 / NFT-SEC-03 + +| AC | Coverage | +|----|----------| +| AC-1 iNav SKIP | scenario-top guard on `fc_adapter == "inav"` | +| AC-2/3/4 per-sub-case rejection ≤500 ms + no position update | per-sub-case `passes_rejection` + `passes_no_position_update` (3 ACs × 2 sub-asserts) | +| AC-5 vio_strategy parameterization | conftest matrix | + +### AZ-439 / NFT-SEC-04 + +| AC | Coverage | +|----|----------| +| AC-1a probe no crash | `passes_no_crash` + scenario AC-1a assert | +| AC-1b probe graceful outcome | `passes_graceful_outcome` + scenario AC-1b assert (rejects silent drops) | +| AC-2 ASan fuzz 0 findings ≥4 h | `passes_findings` + `passes_duration` + scenario AC-2 assert | +| AC-3 ASan fuzz ≥1000 corpus | `reached_corpus_floor` (informational only per spec; recorded in CSV, not asserted) | +| AC-4 parameterization | probe = full matrix; fuzz = ardupilot + per-vio only (justified inline to avoid duplicating a 4 h run) | + +`traces_to` markers: +- NFT-SEC-01: `AC-NEW-9,AC-1,AC-2,AC-3,AC-4` +- NFT-SEC-02: `AC-NEW-10,AC-1,AC-4` +- NFT-SEC-03: `AC-NEW-11,AC-1,AC-2,AC-3,AC-4,AC-5` +- NFT-SEC-04 probe: `RESTRICT-CVE-1,AC-1,AC-4` +- NFT-SEC-04 fuzz: `RESTRICT-CVE-1,AC-2,AC-3,AC-4` +- NFT-SEC-05: `AC-NEW-10,AC-2,AC-3,AC-4` + +## Code Review + +**Verdict**: PASS_WITH_WARNINGS — 0 Critical, 0 High, 0 Medium, 5 Low. + +- **F1 (Low / Maintainability — carry-over)**: `write_csv_evidence` boilerplate continues to grow (13 helpers). +- **F2 (Low / Spec-Gap)**: DNS-blackhole sidecar referenced by NFT-SEC-05 but not deployed in `e2e/docker/docker-compose.test.yml`. +- **F3 (Low / Spec-Gap)**: AP MAVLink 2.0 signing handshake (AZ-416) must be triggered by AZ-595 fixture builder before NFT-SEC-03 replay can run end-to-end. +- **F4 (Low / Maintainability — carry-over)**: `_resolve_fixture_path` duplicated across 6 new scenarios. +- **F5 (Low / Design-aligned)**: NFT-SEC-04 ASan-fuzz AC-3 corpus floor is informational-only per task spec. + +Full review: `_docs/03_implementation/reviews/batch_87_review.md`. + +## Cumulative Review (Batches 85-87 — K=3 Window) + +**Verdict**: PASS_WITH_WARNINGS. 5 cross-batch findings: + +- **CR-F1 (Medium / Maintainability)**: 13 helpers each duplicate the `write_csv_evidence` pattern. Recommended PBI: shared `csv_evidence_writer.py` (3 pts). +- **CR-F2 (Medium / Maintainability)**: 13 scenarios each duplicate `_resolve_fixture_path`. Recommended PBI: shared `fixture_path.resolve()` (2 pts). +- **CR-F3 (Low / Spec-Gap)**: AZ-595 fixture builder doesn't exist as a tracked task; needs to materialize 13 JSON contracts. Recommended PBI: 5 pts. +- **CR-F4 (Low / Infrastructure-Gap)**: DNS-blackhole sidecar absent. Recommended PBI: 3 pts. +- **CR-F5 (Informational)**: full unit-test suite (1151 tests, ~138 s) runs green from workspace root. + +Full cumulative review: `_docs/03_implementation/reviews/cumulative_review_batches_85_87.md`. + +## Production Dependencies + +Surfaced for the traceability matrix + AZ-595: + +1. **AZ-595 (fixture builder)**: emit `nft_sec_01_cache_poisoning.json` (per-flight cache + poisoned-tile slate + runner-collected `false_trust_events` + `rejection_reasons` counter); `nft_sec_02_no_egress.json` (before/after Docker network stats snapshots); `nft_sec_03_mavlink_signing.json` (3 injection timestamps + AP STATUSTEXT + GLOBAL_POSITION_INT captures); `nft_sec_04_cve_probe.json` (probe_injected_at_ms + per-frame FDR record sequence); `nft_sec_04_asan_fuzz.json` (ASan stderr log + duration + corpus size); `nft_sec_05_dns_blackhole.json` (sidecar_healthy + lookup_outcome + UDP-53 before/after). +2. **AZ-444 (Tier-2 runner) — optional**: NFT-SEC-04 ASan-fuzz at Tier-2 (Jetson) per the same release-gate flag. +3. **e2e infrastructure**: DNS-blackhole sidecar service in `docker-compose.test.yml` per `environment.md`. +4. **AZ-416 (FT-P-09-AP) — already in `done/`**: AP MAVLink 2.0 signing handshake must run before AZ-595 generates the NFT-SEC-03 replay payload. +5. **SUT**: outbound `source_label` MUST carry `tile_id` for NFT-SEC-01 false-trust attribution; FDR MUST emit deterministic decode-success/error per frame for NFT-SEC-04 silent-drop detection. + +## Architecture Compliance + +- All new files under `e2e/`, owned by the Blackbox Tests cross-cutting component per `_docs/02_document/module-layout.md`. +- No imports from `src/gps_denied_onboard` (verified — only `runner.helpers.sitl_observer`, stdlib). +- No new cyclic dependencies. New evaluators are leaves of the import DAG. +- No new infrastructure libraries (stdlib `csv`, `dataclasses`, `enum`, `re`, `pathlib`, `math` only). + +## Sub-step Trace + +Phases executed per `implement/SKILL.md`: +- phase 5 (load-spec) → 4 task specs read +- phase 6 (implement-tasks-sequentially) → 5 helpers + 6 scenarios + 5 unit-test files for all 4 tasks +- phase 7 (verify-ac-coverage) → ACs traced above +- phase 8 (code-review) → batch_87_review.md (PASS_WITH_WARNINGS, 5 Low) +- phase 8.5 (cumulative-review) → cumulative_review_batches_85_87.md (PASS_WITH_WARNINGS, 5 cross-batch findings) +- phase 11 (commit-batch) → next. + +## Notes on this batch + +- A Jira transition mistake was made early in this batch (used `id=31` for "In Progress" but `id=31` in this workflow = "Done"). Caught by the mandatory read-back gate, corrected by re-transitioning to id `21` (verified-correct via `getTransitionsForJiraIssue` lookup). Lesson recorded in `_docs/LESSONS.md`. No code or git artifacts were affected — only the tracker state, which is fully restored. diff --git a/_docs/03_implementation/reviews/batch_87_review.md b/_docs/03_implementation/reviews/batch_87_review.md new file mode 100644 index 0000000..7469765 --- /dev/null +++ b/_docs/03_implementation/reviews/batch_87_review.md @@ -0,0 +1,155 @@ +# Code Review — Batch 87 + +**Tasks**: AZ-436 + AZ-437 + AZ-438 + AZ-439 (NFT-SEC-01..05 security scenarios) +**Total complexity**: 16 points (5 + 3 + 3 + 5) +**Reviewer**: autodev / `implement` skill phase 8 +**Date**: 2026-05-17 +**Verdict**: PASS_WITH_WARNINGS (5 Low; 0 Medium / High / Critical) + +## Phase 1 — Context + +Per `_dependencies_table.md` these four NFT-SEC tasks complete the +security NFT slice of E-BBT (AZ-262). All five scenarios (cache +poisoning, no-egress, MAVLink signing, OpenCV CVE probe, OpenCV CVE +ASan fuzz, DNS blackhole) follow the established batch-85 / batch-86 +"helper + fixture-consumer scenario + unit-test trio" pattern. + +Public-boundary discipline: every new helper module declares it in its +module docstring; grep confirms none import `src/gps_denied_onboard`. + +## Phase 2 — Spec Compliance + +| Task | AC | Coverage location | +|------|----|-------------------| +| AZ-436 | AC-1 (N flights complete) | `_parse_payload` + `len(flights) < NFT_SEC_01_CI_MIN_FLIGHTS` gate + scenario `flight_count` NFR record | +| AZ-436 | AC-2 (poison ratio + layer coverage + rejection-reason vocabulary) | `passes_ratio` + `passes_layer_coverage` + `passes_rejection_reason_vocabulary` (3 sub-assertions) | +| AZ-436 | AC-3 (false-trust budget zero-tolerance) | `passes_budget` + scenario `total_false_trust` + `budget` NFR records | +| AZ-436 | AC-4 (canonical-only default; release-gate full matrix) | `_is_canonical_param` + `E2E_NFT_SEC_01_RELEASE_GATE` env var | +| AZ-437 | AC-1 (NFT-SEC-02 0 packets to non-internal) | `evaluate_no_egress.passes` + scenario AC-1 assert | +| AZ-437 | AC-2 (NFT-SEC-05 sidecar healthy) | `sidecar_healthy` field + scenario AC-2 assert | +| AZ-437 | AC-3a (NFT-SEC-05 lookup fails) | `passes_lookup` + scenario AC-3a assert | +| AZ-437 | AC-3b (NFT-SEC-05 UDP-53 silent) | `passes_udp_silence` + scenario AC-3b assert | +| AZ-437 | AC-4 (parameterization) | conftest matrix unchanged; both scenarios run per (fc_adapter, vio_strategy) | +| AZ-438 | AC-1 (iNav SKIP) | `if fc_adapter == "inav": pytest.skip(...)` | +| AZ-438 | AC-2/3/4 (per-sub-case rejection ≤500ms + no position update) | per-sub-case `passes_rejection` + `passes_no_position_update` | +| AZ-438 | AC-5 (vio_strategy parameterization) | conftest matrix | +| AZ-439 | AC-1a (probe: no crash) | `passes_no_crash` (last FDR record ≥ probe injection) | +| AZ-439 | AC-1b (probe: graceful outcome) | `passes_graceful_outcome` (decode-success OR frame-decode-error within ±50 ms tolerance) | +| AZ-439 | AC-2 (fuzz: 0 ASan findings ≥4 h) | `passes_findings` + `passes_duration` + scenario AC-2 assert | +| AZ-439 | AC-3 (fuzz: ≥1000 corpus) | `reached_corpus_floor` (informational only — recorded in CSV; not asserted, matches spec "informational only; no hard threshold") | +| AZ-439 | AC-4 (parameterization) | probe = full matrix; fuzz = ardupilot + per-vio (avoids duplicating a 4 h run, justified inline) | + +Spec compliance: PASS. AC-3 on AZ-439 fuzz is correctly modeled as +informational-only per the task spec. + +## Phase 3 — Code Quality + +- All five helpers use frozen dataclasses for verdicts. Single-responsibility: + evaluators do verdict logic only; scenarios do fixture I/O + assertions. +- No `2>/dev/null`, bare `except`, or empty try-catch blocks anywhere. +- Comment discipline upheld — only `# Arrange`/`# Act`/`# Assert` in tests; + inline comments in helpers explain non-obvious intent (e.g. why ASan + threshold patterns are explicitly enumerated rather than wildcard-matched). +- All paths are typed; `Sequence` / `frozen=True` / `Enum` / `Path` annotations + appear consistently. + +## Phase 4 — Security Scan + +- No new credentials, secrets, or API keys introduced. +- `cve_probe_evaluator` deliberately rejects "silent drop" outcomes as failure + — preventing a regression where the SUT silently swallows malformed JPEGs. +- `asan_fuzz_evaluator` classifies *any* unknown ASan match as `OTHER_FINDING` + and still fails — preventing a regression where a future sanitizer category + is silently accepted. +- `egress_observer` treats SUCCESS DNS lookup as the only failing outcome. +- `mavlink_signing_evaluator` requires BOTH rejection STATUSTEXT AND + no-position-update for pass — catches signaling-only rejection bug class. +- Test data (poisoned tile generators, BAD_SIGNATURE regex) does not contain + exploit payloads, only string patterns + benign synthetic counters. + +Verdict: PASS. + +## Phase 5 — Performance Scan + +- Evaluators are O(N) over their input collections; no quadratic loops or + unbounded recursion. +- No I/O in evaluator hot paths; CSV writing is one-shot at scenario end. +- AZ-436 default N=1000 with single canonical param keeps the per-CI cost + bounded; release-gate N=10000 × 6 params is correctly opt-in via env flag. +- AZ-439 fuzz is correctly release-gated (≥4 h is too expensive for CI). + +Verdict: PASS. + +## Phase 6 — Cross-Task Consistency + +- All six scenarios share an almost-identical structure: + 1. tier-guard skip (where applicable), + 2. `sitl_replay_ready` skip, + 3. fixture-path resolution, + 4. fixture-not-found → `pytest.fail` with explicit production-dep pointer, + 5. payload parse → typed records (with `pytest.fail` on shape errors), + 6. evaluator call → CSV evidence + NFR records, + 7. AC assertions with diagnostic messages. +- The five helpers share the `write_csv_evidence` pattern (one row per + result + aggregate footer where applicable). + +Verdict: PASS (consistency upheld). + +## Phase 7 — Architecture Compliance + +- All new files under `e2e/` — owned by the Blackbox Tests cross-cutting + component per `_docs/02_document/module-layout.md`. +- No `src/gps_denied_onboard` imports (verified by inspection of every + new file's import section). +- New evaluators are leaves of the import DAG — they import only stdlib + + the existing `runner.helpers.sitl_observer` for fixture-path resolution. +- No new infrastructure libraries; all helpers use stdlib `csv`, `dataclasses`, + `enum`, `re`, `pathlib`, `math`. + +Verdict: PASS. + +## Findings + +### F1 — `write_csv_evidence` boilerplate continues to grow (Low / Maintainability — carry-over of batch-85 F4 + batch-86 F1) + +Each of the 5 new evaluators adds its own `write_csv_evidence(out_path, report) -> Path` with the same header-then-rows pattern. The duplication is now spread across 13 helpers (batches 85, 86, 87 combined). + +**Verdict**: defer to a future hygiene PBI; the per-evaluator schema variation makes a generic abstraction non-trivial (each report's row shape differs significantly — aggregate row vs per-sub-case, single row vs many). + +### F2 — DNS-blackhole sidecar production dependency not yet realized (Low / Spec-Gap surfacing) + +The `NFT-SEC-05` scenario depends on a DNS-blackhole sidecar (per `environment.md`) that must be configured to absorb all UDP-53 probes. This sidecar does not exist in the e2e harness today (no service entry in `e2e/docker/docker-compose.test.yml` and no `dns-blackhole` directory under `e2e/fixtures/`). + +**Surfaced to AZ-595 + e2e infrastructure**: the DNS-blackhole sidecar must be wired before NFT-SEC-05 can run end-to-end with live captures. + +**Verdict**: not a code defect — surfaced as a production dependency in the batch report. + +### F3 — AP MAVLink 2.0 signing handshake required for NFT-SEC-03 (Low / Spec-Gap surfacing) + +`NFT-SEC-03` assumes AP has the MAVLink 2.0 signing handshake completed (`SETUP_SIGNING` exchange + passkey installed) before the test runs. That handshake is owned by FT-P-09-AP (AZ-416 — already in `done/`); the NFT-SEC-03 fixture builder (AZ-595) must invoke that handshake first when generating its replay. + +**Verdict**: not a code defect — production dependency on AZ-595 noted in the scenario docstring. + +### F4 — `_resolve_fixture_path` duplicated across 6 scenarios (Low / Maintainability) + +Carry-over of batch-85 F3 + batch-86 F4. The six new security scenarios each define their own `_resolve_fixture_path() -> Path` with identical logic differing only in env var name + default filename. + +**Verdict**: defer to a future hygiene PBI alongside the NFT-PERF and NFT-RES instances. + +### F5 — ASan-fuzz AC-3 corpus floor is informational-only (Low / Spec-aligned) + +`AsanFuzzReport.reached_corpus_floor` is correctly computed but does NOT contribute to `passes` (and is NOT asserted in the scenario). This matches the task spec's "informational only; no hard threshold" wording exactly. + +**Verdict**: not a defect — flagged so a future reviewer doesn't mistake the +read-only field for missing coverage. + +## Auto-Fix Gate + +5 Low findings; no Critical / High / Medium. Per the implement-skill auto-fix +gate, no auto-fix actions are triggered. F1 + F4 are deferred to hygiene PBIs; +F2 + F3 are production dependencies surfaced to the cumulative review + AZ-595 +fixture builder; F5 is a documented design decision matching the spec. + +## Final Verdict + +**PASS_WITH_WARNINGS** — proceed to commit + tracker transition. diff --git a/_docs/03_implementation/reviews/cumulative_review_batches_85_87.md b/_docs/03_implementation/reviews/cumulative_review_batches_85_87.md new file mode 100644 index 0000000..a8036db --- /dev/null +++ b/_docs/03_implementation/reviews/cumulative_review_batches_85_87.md @@ -0,0 +1,85 @@ +# Cumulative Code Review — Batches 85-87 + +**Window**: batches 85 (AZ-428..AZ-431 NFT-PERF), 86 (AZ-432..AZ-435 NFT-RES), 87 (AZ-436..AZ-439 NFT-SEC) +**Total tasks**: 12 (4 + 4 + 4) +**Total complexity**: 16 + 14 + 16 = 46 points +**Per-batch verdicts**: PASS_WITH_WARNINGS / PASS_WITH_WARNINGS / PASS_WITH_WARNINGS +**Cumulative verdict**: PASS_WITH_WARNINGS — proceed; promote hygiene findings to PBIs +**Reviewer**: autodev / `implement` skill phase 8.5 +**Date**: 2026-05-17 + +## What this window delivered + +The complete NFT slice of the E-BBT (AZ-262) epic — 12 helpers + ~74 unit +test files + ~13 scenario files implementing every Performance, Resilience, +and Security NFT in the traceability matrix: + +| Batch | Theme | Scenarios | Helpers | Net Unit Tests | +|-------|-------|-----------|---------|---------------| +| 85 | Performance | 4 (e2e_latency, streaming, ttff, spoof_promotion) | 4 | ~50 | +| 86 | Resilience | 4 (imu_fallback, companion_reboot, monte_carlo, escalation_ladder) | 4 | 74 | +| 87 | Security | 6 (cache_poisoning, no_egress, dns_blackhole, mavlink_signing, opencv_cve_probe, asan_fuzz) | 5 | 75 | + +All 12 scenarios are fixture-consumers that skip cleanly without the SITL +replay fixture (AZ-595) being present. + +## Cross-batch consistency + +PASS. Every scenario in this window adopts the same 7-step shape: + +1. tier/parameterization skip (where AC permits); +2. `sitl_replay_ready` skip with explicit pointer to the matching unit-test file; +3. fixture-path resolution via `_resolve_fixture_path()` helper; +4. fixture-not-found → `pytest.fail` with explicit AZ-595 production-dep pointer; +5. payload parse → typed records with shape-error `pytest.fail`; +6. evaluator call → CSV evidence + NFR records; +7. AC assertions with diagnostic messages naming the AC. + +Every helper in this window adopts the same shape: + +- frozen dataclasses for ALL records / reports; +- one `evaluate()` (or `evaluate_subcase` + `evaluate`) entry point; +- one `write_csv_evidence(out_path, report) -> Path` writer; +- `Sequence` parameter typing (Liskov-substitutable input collections); +- module docstring declaring public-boundary discipline. + +Cross-helper consistency is the strongest signal of design quality this +window — a future helper added by anyone should be able to copy a +batch-85/86/87 evaluator and stay structurally on-pattern. + +## Cross-batch findings + +### CR-F1 — `write_csv_evidence` duplication continues to scale (Medium / Maintainability) + +What started as a per-batch Low finding (batch-85 F4, batch-86 F1, batch-87 F1) is now spread across **13 helpers**. The duplication is no longer marginal; the per-evaluator schema variation makes a fully generic abstraction non-trivial, but a thin `csv_evidence_writer.py` helper offering `write_header_and_rows(out_path, header, rows, footer=None)` could remove ~30 lines per evaluator. + +**Proposed PBI**: `AZ-???` (post-cycle hygiene) — 3 points. Replace per-evaluator CSV-writer boilerplate with shared helper. Scope: 13 evaluator files + 1 new helper + 1 unit test file. Migrates incrementally — old API can co-exist during migration. + +### CR-F2 — `_resolve_fixture_path` duplicated across 13 scenarios (Medium / Maintainability) + +Carry-over of batch-85 F3, batch-86 F4, batch-87 F4. Every scenario in this window defines an identical `_resolve_fixture_path() -> Path` differing only in env-var name + default filename. + +**Proposed PBI**: `AZ-???` (post-cycle hygiene) — 2 points. Add `runner.helpers.fixture_path.resolve(env_var_name, default_filename) -> Path` shared helper. Scope: 13 scenarios + 1 new helper + 1 unit test file. Pure refactor. + +### CR-F3 — Production dependency on AZ-595 fixture builder is concentrated (Low / Spec-Gap surfacing) + +All 12 scenarios in this window declare a production dependency on the AZ-595 fixture builder emitting their respective replay JSON files. AZ-595 itself doesn't exist as a tracked task in the dependencies table (it's referenced in 12 scenario docstrings but has no work-item entry). + +**Action**: a single new task `AZ-???` should be created to materialize the 13 fixture-JSON contracts (NFT-PERF-01..04 + NFT-RES-01..04 + NFT-SEC-01..05) into a fixture-builder module under `e2e/fixtures/sitl_replay_builder/`. Complexity estimate: 5 points (touches every fixture builder + adds 13 new JSON schemas). + +### CR-F4 — DNS-blackhole sidecar is referenced but not deployed (Low / Infrastructure-Gap) + +Batch-87 F2 found that NFT-SEC-05 depends on a DNS-blackhole sidecar configured per `environment.md`, but that sidecar does NOT exist in the e2e Docker compose stack. This is a Tier-1 infrastructure gap that blocks NFT-SEC-05's live-capture path. + +**Proposed PBI**: `AZ-???` (e2e infrastructure) — 3 points. Add `dns-blackhole` sidecar service to `e2e/docker/docker-compose.test.yml` per `environment.md`. Scope: 1 new service entry + 1 Dockerfile + healthcheck wiring. + +### CR-F5 — Cross-batch test-output gate is healthy + +PASS — informational. All 215 batch-87 unit tests + 199 batch-86 unit tests + ~50 batch-85 unit tests collect and pass without errors. The complete `e2e/_unit_tests/` suite (1151 tests, ~138 s wall-clock) runs green from workspace root. The expected 12 pre-existing collection errors when running pytest from inside `e2e/` (vs workspace root) are an unrelated path-resolution quirk and not caused by this window. + +## Final verdict + +**PASS_WITH_WARNINGS**. Proceed to commit + tracker transition + archive. +The 5 cross-batch findings above should be promoted to hygiene PBIs after +the next batch (or earlier if user prioritizes — the F1 + F2 duplication +will keep growing with every new NFT-LIM helper in batch 88). diff --git a/_docs/LESSONS.md b/_docs/LESSONS.md new file mode 100644 index 0000000..7a583dc --- /dev/null +++ b/_docs/LESSONS.md @@ -0,0 +1,11 @@ +# LESSONS + +Append-only ledger of lessons learned during the project. New entries go at the **top**. Each entry is one short bullet + a one-sentence "what changed". + +--- + +## 2026-05-17 — Always call `getTransitionsForJiraIssue` before `transitionJiraIssue` + +**Trigger**: In batch 87 (autodev step 10), I transitioned AZ-436..AZ-439 with `transition.id="31"` assuming = "In Progress" from stale memory. Read-back showed all four moved to **Done** instead (id `31` in this workflow = Done; In Progress = `21`, In Testing = `32`, To Do = `11`). The mistake was caught by the tracker rule's mandatory read-back gate, fixed by re-transitioning to `21`, and confirmed via second read-back. + +**What changed**: Treat the transition ID as workflow-specific, not memorizable across sessions. Always query `getTransitionsForJiraIssue` first on the actual target issue (or one in the same project/workflow) and select the transition by `name` ("In Progress" / "In Testing" / "Done" / "To Do") — never by hard-coded numeric id. This is true even when you "remember" the IDs from a prior batch this same day, because the agent has no guarantee the workflow definition is stable. diff --git a/e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py b/e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py new file mode 100644 index 0000000..d06e99f --- /dev/null +++ b/e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py @@ -0,0 +1,176 @@ +"""Unit tests for ``runner.helpers.asan_fuzz_evaluator`` (NFT-SEC-04 / AZ-439 fuzz).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from runner.helpers import asan_fuzz_evaluator as afe + + +@pytest.mark.parametrize( + "line,expected", + [ + ( + "=================================================================\n", + None, + ), + ("==1234==ERROR: AddressSanitizer: heap-buffer-overflow on address ...", afe.AsanFindingCategory.HEAP_BUFFER_OVERFLOW), + ("==1234==ERROR: AddressSanitizer: heap-use-after-free on address ...", afe.AsanFindingCategory.HEAP_USE_AFTER_FREE), + ("==1234==ERROR: AddressSanitizer: stack-buffer-overflow on address ...", afe.AsanFindingCategory.STACK_BUFFER_OVERFLOW), + ("==1234==ERROR: AddressSanitizer: stack-use-after-return on ...", afe.AsanFindingCategory.STACK_USE_AFTER_RETURN), + ("==1234==ERROR: AddressSanitizer: global-buffer-overflow on ...", afe.AsanFindingCategory.GLOBAL_BUFFER_OVERFLOW), + ("==1234==ERROR: AddressSanitizer: use-after-free on ...", afe.AsanFindingCategory.USE_AFTER_FREE), + ("==1234==ERROR: AddressSanitizer: double-free on ...", afe.AsanFindingCategory.DOUBLE_FREE), + # A new ASan category we haven't catalogued yet — must still + # fail the test by classifying as OTHER_FINDING. + ( + "==1234==ERROR: AddressSanitizer: mysterious-future-category on ...", + afe.AsanFindingCategory.OTHER_FINDING, + ), + ("just a normal log line, harmless", None), + ("ERROR but no AddressSanitizer prefix", None), + ], +) +def test_classify_asan_line( + line: str, expected: afe.AsanFindingCategory | None +) -> None: + assert afe.classify_asan_line(line) == expected + + +def test_zero_findings_and_full_duration_passes() -> None: + report = afe.evaluate( + ["info line", "another info line"], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + assert report.passes_findings + assert report.passes_duration + assert report.passes + + +def test_short_duration_fails_even_with_zero_findings() -> None: + report = afe.evaluate( + [], + duration_seconds=60, # 1 minute + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + assert report.passes_findings + assert not report.passes_duration + assert not report.passes + + +def test_one_finding_fails_full_run() -> None: + report = afe.evaluate( + ["==1==ERROR: AddressSanitizer: heap-buffer-overflow on ..."], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + assert len(report.findings) == 1 + assert report.findings[0].category is afe.AsanFindingCategory.HEAP_BUFFER_OVERFLOW + assert not report.passes_findings + assert not report.passes + + +def test_unknown_asan_finding_still_fails() -> None: + report = afe.evaluate( + ["==1==ERROR: AddressSanitizer: brand-new-category"], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + assert report.findings[0].category is afe.AsanFindingCategory.OTHER_FINDING + assert not report.passes + + +def test_corpus_floor_is_informational_only() -> None: + # 0 findings + full duration but well below corpus floor → still passes. + report = afe.evaluate( + [], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=10, + ) + + assert not report.reached_corpus_floor + assert report.passes + + +def test_snippet_is_truncated_for_evidence() -> None: + huge = "==1==ERROR: AddressSanitizer: heap-buffer-overflow " + "x" * 500 + report = afe.evaluate( + [huge], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + assert len(report.findings[0].snippet) <= 200 + + +def test_multiple_findings_classified_and_counted() -> None: + log_lines = [ + "info", + "==1==ERROR: AddressSanitizer: heap-buffer-overflow", + "info", + "==2==ERROR: AddressSanitizer: heap-buffer-overflow", + "==3==ERROR: AddressSanitizer: use-after-free", + "trailing log", + ] + + report = afe.evaluate( + log_lines, + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + assert len(report.findings) == 3 + categories = [f.category for f in report.findings] + assert categories.count(afe.AsanFindingCategory.HEAP_BUFFER_OVERFLOW) == 2 + assert categories.count(afe.AsanFindingCategory.USE_AFTER_FREE) == 1 + assert not report.passes + + +def test_csv_evidence_round_trip_no_findings(tmp_path: Path) -> None: + report = afe.evaluate( + [], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + out = tmp_path / "out.csv" + afe.write_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "duration_seconds" + assert rows[1][5] == "true" # passes_findings + assert rows[1][6] == "true" # passes + # No per-finding section because no findings were recorded. + assert len(rows) == 2 + + +def test_csv_evidence_round_trip_with_findings(tmp_path: Path) -> None: + report = afe.evaluate( + [ + "==1==ERROR: AddressSanitizer: heap-buffer-overflow", + "==2==ERROR: AddressSanitizer: use-after-free", + ], + duration_seconds=afe.MIN_FUZZ_DURATION_SECONDS, + corpus_size=afe.MIN_CORPUS_COVERAGE, + ) + + out = tmp_path / "out.csv" + afe.write_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "duration_seconds" + assert rows[1][6] == "false" # passes + # Aggregate row + blank + sub-header + 2 finding rows = 5 rows total. + assert rows[2] == [] + assert rows[3] == ["finding_index", "category", "snippet"] + assert len(rows) == 6 diff --git a/e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py b/e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py new file mode 100644 index 0000000..abb0b41 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py @@ -0,0 +1,245 @@ +"""Unit tests for ``runner.helpers.cache_poisoning_evaluator`` (NFT-SEC-01 / AZ-436).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from runner.helpers import cache_poisoning_evaluator as cpe + + +def _spec(tile_id: str, layer: str) -> cpe.PoisonedTileSpec: + return cpe.PoisonedTileSpec(tile_id=tile_id, defense_layer=layer) + + +def _flight( + flight_id: str, + *, + total: int = 1000, + poisoned: tuple[cpe.PoisonedTileSpec, ...] | None = None, + false_trust: tuple[cpe.FalseTrustEvent, ...] | None = None, + rejection_reasons: dict[str, int] | None = None, +) -> cpe.FlightOutcome: + if poisoned is None: + poisoned = ( + _spec("t1", cpe.DEFENSE_LAYER_SIGNING), + _spec("t2", cpe.DEFENSE_LAYER_FRESHNESS), + _spec("t3", cpe.DEFENSE_LAYER_VOTING), + ) + return cpe.FlightOutcome( + flight_id=flight_id, + total_tile_count=total, + poisoned_tiles=poisoned, + false_trust_events=false_trust or (), + rejection_reasons=rejection_reasons or {}, + ) + + +def test_poison_ratio_within_band_passes_ratio_check() -> None: + # 3 poisoned / 100 total = 3 % — inside [1 %, 5 %]. + flight = _flight("f1", total=100) + + report = cpe.evaluate([flight]) + + assert report.passes_ratio + assert flight.poison_ratio == pytest.approx(0.03) + + +def test_poison_ratio_below_min_fails_ratio_check() -> None: + # 3 / 1000 = 0.3 % — below the 1 % floor. + flight = _flight("f-low", total=1000) + + report = cpe.evaluate([flight]) + + assert not report.passes_ratio + assert "f-low" in report.flights_with_bad_poison_ratio + + +def test_poison_ratio_above_max_fails_ratio_check() -> None: + # 3 / 50 = 6 % — above the 5 % ceiling. + flight = _flight("f-high", total=50) + + report = cpe.evaluate([flight]) + + assert not report.passes_ratio + assert "f-high" in report.flights_with_bad_poison_ratio + + +def test_zero_total_tile_count_gives_zero_ratio() -> None: + flight = _flight("f-empty", total=0, poisoned=()) + + report = cpe.evaluate([flight]) + + assert flight.poison_ratio == 0.0 + assert "f-empty" in report.flights_with_bad_poison_ratio + + +def test_missing_defense_layer_fails_layer_coverage() -> None: + # Only signing + freshness; voting layer missing. + flight = _flight( + "f-missing", + total=100, + poisoned=( + _spec("t1", cpe.DEFENSE_LAYER_SIGNING), + _spec("t2", cpe.DEFENSE_LAYER_FRESHNESS), + ), + ) + + report = cpe.evaluate([flight]) + + assert not report.passes_layer_coverage + [(flight_id, missing)] = report.flights_missing_defense_layers + assert flight_id == "f-missing" + assert missing == [cpe.DEFENSE_LAYER_VOTING] + + +def test_all_three_defense_layers_pass_layer_coverage() -> None: + flight = _flight("f-complete", total=100) + + report = cpe.evaluate([flight]) + + assert report.passes_layer_coverage + + +def test_zero_false_trust_events_passes_budget() -> None: + flight = _flight("f", total=100) + + report = cpe.evaluate([flight]) + + assert report.total_false_trust == 0 + assert report.passes_budget + + +def test_single_false_trust_event_fails_budget() -> None: + flight = _flight( + "f", + total=100, + false_trust=( + cpe.FalseTrustEvent( + flight_id="f", + tile_id="t1", + monotonic_ms=12345, + defense_layer=cpe.DEFENSE_LAYER_SIGNING, + ), + ), + ) + + report = cpe.evaluate([flight]) + + assert report.total_false_trust == 1 + assert not report.passes_budget + # Zero-tolerance default — even though 1 event is technically within + # the 0.01 budget at N=10000, the helper must reject it. + + +def test_unknown_rejection_reason_fails_vocabulary_check() -> None: + flight = _flight( + "f-vocab", + total=100, + rejection_reasons={"made_up_reason": 7}, + ) + + report = cpe.evaluate([flight]) + + assert not report.passes_rejection_reason_vocabulary + assert "f-vocab" in report.flights_with_unknown_rejection_reasons + + +def test_known_rejection_reasons_pass_vocabulary_check() -> None: + flight = _flight( + "f-ok", + total=100, + rejection_reasons={ + cpe.DEFENSE_LAYER_SIGNING: 12, + cpe.DEFENSE_LAYER_VOTING: 8, + "freshness_gate_downgrade": 3, + }, + ) + + report = cpe.evaluate([flight]) + + assert report.passes_rejection_reason_vocabulary + + +def test_aggregate_budget_scales_with_flight_count() -> None: + flights = [_flight(f"f{i}", total=100) for i in range(50)] + + report = cpe.evaluate(flights) + + assert report.flight_count == 50 + assert report.budget == pytest.approx(50 * 1e-6) + + +def test_aggregate_counts_false_trust_across_flights() -> None: + flights = [ + _flight( + f"f{i}", + total=100, + false_trust=( + cpe.FalseTrustEvent( + flight_id=f"f{i}", + tile_id="tx", + monotonic_ms=100 * i, + defense_layer=cpe.DEFENSE_LAYER_SIGNING, + ), + ) + if i % 2 == 0 + else (), + ) + for i in range(4) + ] + + report = cpe.evaluate(flights) + + assert report.total_false_trust == 2 # f0 and f2 each had one event + + +def test_overall_pass_requires_all_subchecks() -> None: + flight = _flight("f", total=100) + + report = cpe.evaluate([flight]) + + assert report.passes + assert report.passes_budget + assert report.passes_ratio + assert report.passes_layer_coverage + assert report.passes_rejection_reason_vocabulary + + +def test_overall_pass_fails_if_any_subcheck_fails() -> None: + flight = _flight( + "f-broken", + total=1000, # 3/1000 = 0.3 % — bad ratio + ) + + report = cpe.evaluate([flight]) + + assert not report.passes + + +def test_empty_flight_list_gives_trivial_pass() -> None: + # Treat an empty run as a no-op rather than an implicit failure; + # the scenario test is responsible for asserting N >= 1. + report = cpe.evaluate([]) + + assert report.flight_count == 0 + assert report.total_false_trust == 0 + assert report.passes + + +def test_csv_evidence_has_header_per_flight_and_aggregate_rows(tmp_path: Path) -> None: + flights = [_flight(f"f{i}", total=100) for i in range(3)] + report = cpe.evaluate(flights) + + out = tmp_path / "out.csv" + cpe.write_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "flight_id" + assert {rows[1][0], rows[2][0], rows[3][0]} == {"f0", "f1", "f2"} + assert rows[4] == [] + assert rows[5][0] == "AGGREGATE" + assert any("flight_count=3" in cell for cell in rows[5]) diff --git a/e2e/_unit_tests/helpers/test_cve_probe_evaluator.py b/e2e/_unit_tests/helpers/test_cve_probe_evaluator.py new file mode 100644 index 0000000..25998c4 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_cve_probe_evaluator.py @@ -0,0 +1,120 @@ +"""Unit tests for ``runner.helpers.cve_probe_evaluator`` (NFT-SEC-04 / AZ-439 probe).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +from runner.helpers import cve_probe_evaluator as cpe + + +def _rec(ms: int, kind: str) -> cpe.FdrSurvivalRecord: + return cpe.FdrSurvivalRecord(monotonic_ms=ms, kind=kind) + + +def test_decode_success_in_window_is_classified_as_decode_success() -> None: + outcome = cpe.classify_probe_outcome( + [_rec(10_005, "frame-decode-success")], + probe_injected_at_ms=10_000, + ) + assert outcome is cpe.ProbeFrameOutcome.DECODE_SUCCESS + + +def test_decode_error_in_window_is_classified_as_frame_decode_error() -> None: + outcome = cpe.classify_probe_outcome( + [_rec(10_010, "frame-decode-error")], + probe_injected_at_ms=10_000, + ) + assert outcome is cpe.ProbeFrameOutcome.FRAME_DECODE_ERROR + + +def test_no_record_in_window_is_missing() -> None: + outcome = cpe.classify_probe_outcome( + [_rec(9_900, "frame-decode-success")], + probe_injected_at_ms=10_000, + ) + assert outcome is cpe.ProbeFrameOutcome.MISSING + + +def test_record_outside_tolerance_is_missing() -> None: + outcome = cpe.classify_probe_outcome( + [_rec(10_100, "frame-decode-success")], + probe_injected_at_ms=10_000, + tolerance_ms=50, + ) + assert outcome is cpe.ProbeFrameOutcome.MISSING + + +def test_first_match_in_window_wins() -> None: + outcome = cpe.classify_probe_outcome( + [ + _rec(10_005, "frame-decode-success"), + _rec(10_010, "frame-decode-error"), + ], + probe_injected_at_ms=10_000, + ) + assert outcome is cpe.ProbeFrameOutcome.DECODE_SUCCESS + + +def test_passes_when_no_crash_and_decode_success() -> None: + report = cpe.evaluate( + [_rec(10_005, "frame-decode-success"), _rec(11_000, "imu-tick")], + probe_injected_at_ms=10_000, + ) + assert report.passes_no_crash + assert report.passes_graceful_outcome + assert report.passes + + +def test_passes_when_no_crash_and_graceful_error() -> None: + report = cpe.evaluate( + [_rec(10_005, "frame-decode-error"), _rec(11_000, "imu-tick")], + probe_injected_at_ms=10_000, + ) + assert report.passes + + +def test_fails_when_no_post_probe_fdr_record() -> None: + # All FDR records are BEFORE the probe — the SUT crashed at probe + # time. AC-1a fails. + report = cpe.evaluate( + [_rec(9_500, "imu-tick"), _rec(9_900, "frame-decode-success")], + probe_injected_at_ms=10_000, + ) + assert not report.passes_no_crash + assert not report.passes + + +def test_fails_when_silent_drop() -> None: + # SUT is alive (post-probe records exist) but no decode record at + # all — the probe frame was silently swallowed. AC-1b fails. + report = cpe.evaluate( + [_rec(11_000, "imu-tick"), _rec(12_000, "imu-tick")], + probe_injected_at_ms=10_000, + ) + assert report.passes_no_crash + assert not report.passes_graceful_outcome + assert not report.passes + + +def test_empty_fdr_archive_fails_both_subchecks() -> None: + report = cpe.evaluate([], probe_injected_at_ms=10_000) + assert not report.passes_no_crash + assert not report.passes_graceful_outcome + assert not report.passes + + +def test_csv_evidence_round_trip(tmp_path: Path) -> None: + report = cpe.evaluate( + [_rec(10_005, "frame-decode-success")], + probe_injected_at_ms=10_000, + ) + + out = tmp_path / "out.csv" + cpe.write_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "probe_injected_at_ms" + assert rows[1][2] == "decode-success" + assert rows[1][-1] == "true" diff --git a/e2e/_unit_tests/helpers/test_egress_observer.py b/e2e/_unit_tests/helpers/test_egress_observer.py new file mode 100644 index 0000000..199674b --- /dev/null +++ b/e2e/_unit_tests/helpers/test_egress_observer.py @@ -0,0 +1,168 @@ +"""Unit tests for ``runner.helpers.egress_observer`` (NFT-SEC-02 + NFT-SEC-05 / AZ-437).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from runner.helpers import egress_observer as eo + + +def _snap(other: int = 0, internal: int = 0, udp53: int = 0) -> eo.EgressCounterSnapshot: + return eo.EgressCounterSnapshot( + egress_packets_to_internal_net=internal, + egress_packets_to_other_destinations=other, + udp53_egress_packets=udp53, + ) + + +def test_egress_counter_rejects_negative_values() -> None: + with pytest.raises(ValueError, match="cannot be negative"): + eo.EgressCounterSnapshot( + egress_packets_to_internal_net=-1, + egress_packets_to_other_destinations=0, + udp53_egress_packets=0, + ) + + +def test_no_egress_zero_delta_passes() -> None: + before = _snap(other=10, internal=5) + after = _snap(other=10, internal=42) # internal traffic grew; that's fine + + report = eo.evaluate_no_egress(before, after, window_label="5min") + + assert report.delta_other_destinations == 0 + assert report.passes + + +def test_no_egress_nonzero_delta_fails() -> None: + before = _snap(other=10) + after = _snap(other=11) + + report = eo.evaluate_no_egress(before, after, window_label="5min") + + assert report.delta_other_destinations == 1 + assert not report.passes + + +def test_no_egress_records_internal_delta_for_evidence() -> None: + before = _snap(internal=100) + after = _snap(internal=200) + + report = eo.evaluate_no_egress(before, after, window_label="5min-derkachi") + + assert report.delta_internal == 100 # informational; does not affect verdict + assert report.passes + + +def test_dns_blackhole_passes_on_full_silence_and_failed_lookup() -> None: + before = _snap(udp53=7) + after = _snap(udp53=7) + + report = eo.evaluate_dns_blackhole( + before, + after, + lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, + sidecar_healthy=True, + ) + + assert report.passes + + +def test_dns_blackhole_fails_on_successful_lookup() -> None: + before = _snap(udp53=7) + after = _snap(udp53=7) + + report = eo.evaluate_dns_blackhole( + before, + after, + lookup_outcome=eo.DnsLookupOutcome.SUCCESS, + sidecar_healthy=True, + ) + + assert not report.passes_lookup + assert not report.passes + + +def test_dns_blackhole_fails_when_udp53_packets_escaped() -> None: + before = _snap(udp53=7) + after = _snap(udp53=8) + + report = eo.evaluate_dns_blackhole( + before, + after, + lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, + sidecar_healthy=True, + ) + + assert not report.passes_udp_silence + assert not report.passes + + +def test_dns_blackhole_fails_when_sidecar_unhealthy() -> None: + before = _snap() + after = _snap() + + report = eo.evaluate_dns_blackhole( + before, + after, + lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, + sidecar_healthy=False, + ) + + assert not report.passes + + +@pytest.mark.parametrize( + "outcome", + [ + eo.DnsLookupOutcome.NXDOMAIN, + eo.DnsLookupOutcome.TIMEOUT, + eo.DnsLookupOutcome.NO_SERVERS, + eo.DnsLookupOutcome.OTHER_FAILURE, + ], +) +def test_all_failure_outcomes_pass_lookup_check(outcome: eo.DnsLookupOutcome) -> None: + report = eo.evaluate_dns_blackhole( + _snap(), + _snap(), + lookup_outcome=outcome, + sidecar_healthy=True, + ) + + assert report.passes_lookup + + +def test_no_egress_csv_evidence_round_trip(tmp_path: Path) -> None: + before = _snap(other=0, internal=5) + after = _snap(other=0, internal=42) + report = eo.evaluate_no_egress(before, after, window_label="5min") + + out = tmp_path / "out.csv" + eo.write_no_egress_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "window_label" + assert rows[1][0] == "5min" + assert rows[1][-1] == "true" + + +def test_dns_blackhole_csv_evidence_round_trip(tmp_path: Path) -> None: + report = eo.evaluate_dns_blackhole( + _snap(udp53=7), + _snap(udp53=7), + lookup_outcome=eo.DnsLookupOutcome.NXDOMAIN, + sidecar_healthy=True, + ) + + out = tmp_path / "out.csv" + eo.write_dns_blackhole_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "sidecar_healthy" + assert rows[1][1] == "nxdomain" + assert rows[1][-1] == "true" diff --git a/e2e/_unit_tests/helpers/test_mavlink_signing_evaluator.py b/e2e/_unit_tests/helpers/test_mavlink_signing_evaluator.py new file mode 100644 index 0000000..5a4b955 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_mavlink_signing_evaluator.py @@ -0,0 +1,196 @@ +"""Unit tests for ``runner.helpers.mavlink_signing_evaluator`` (NFT-SEC-03 / AZ-438).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from runner.helpers import mavlink_signing_evaluator as mse + + +def _pos(ms: int, lat_e7: int = 0, lon_e7: int = 0) -> mse.PositionSample: + return mse.PositionSample(monotonic_ms=ms, lat_e7=lat_e7, lon_e7=lon_e7) + + +def _st(ms: int, text: str) -> mse.StatustextSample: + return mse.StatustextSample(monotonic_ms=ms, text=text) + + +@pytest.mark.parametrize( + "text,expected", + [ + ("MAVLink: BAD_SIGNATURE", True), + ("BAD SIGNATURE", True), + ("Bad signature received from sysid=255", True), + ("Signature rejected on link 0", True), + ("PreArm: GPS Glitch", False), + ("OK", False), + ("", False), + ], +) +def test_is_bad_signature_statustext_matches_documented_variants( + text: str, expected: bool +) -> None: + assert mse.is_bad_signature_statustext(text) is expected + + +def test_subcase_passes_when_rejection_arrives_within_budget() -> None: + injection = mse.InjectionEvent( + sub_case=mse.SubCase.UNSIGNED, injected_at_ms=10_000 + ) + statustexts = [_st(10_300, "MAVLink: BAD_SIGNATURE")] + positions = [_pos(9_900), _pos(10_100)] + + report = mse.evaluate_subcase(injection, statustexts, positions) + + assert report.rejection_latency_ms == 300 + assert report.passes_rejection + assert report.passes + + +def test_subcase_fails_when_no_rejection_seen() -> None: + injection = mse.InjectionEvent( + sub_case=mse.SubCase.WRONG_KEY, injected_at_ms=10_000 + ) + statustexts = [_st(10_300, "ok normal text")] + positions = [_pos(9_900), _pos(10_100)] + + report = mse.evaluate_subcase(injection, statustexts, positions) + + assert report.rejection_at_ms is None + assert not report.passes_rejection + assert not report.passes + + +def test_subcase_fails_when_rejection_too_slow() -> None: + injection = mse.InjectionEvent( + sub_case=mse.SubCase.REPLAYED, injected_at_ms=10_000 + ) + statustexts = [_st(11_000, "MAVLink: BAD_SIGNATURE")] + positions = [_pos(9_900), _pos(10_100)] + + report = mse.evaluate_subcase(injection, statustexts, positions) + + assert report.rejection_latency_ms == 1000 + assert not report.passes_rejection + assert not report.passes + + +def test_subcase_fails_when_position_drifts() -> None: + injection = mse.InjectionEvent( + sub_case=mse.SubCase.UNSIGNED, injected_at_ms=10_000 + ) + statustexts = [_st(10_200, "MAVLink: BAD_SIGNATURE")] + # 0.0001 deg ≈ 11 m of latitude drift — clearly outside the 1 m + # tolerance, simulating a successful poison of AP's GPS state. + positions = [_pos(9_900, lat_e7=0), _pos(10_100, lat_e7=1_000)] + + report = mse.evaluate_subcase(injection, statustexts, positions) + + assert report.position_drift_m > mse.POSITION_DRIFT_TOLERANCE_M + assert not report.passes_no_position_update + assert not report.passes + + +def test_subcase_passes_with_tiny_jitter_within_tolerance() -> None: + injection = mse.InjectionEvent( + sub_case=mse.SubCase.UNSIGNED, injected_at_ms=10_000 + ) + statustexts = [_st(10_100, "MAVLink: BAD_SIGNATURE")] + # 1 e7-lat unit ≈ 1.1 cm — well below the 1 m tolerance. + positions = [_pos(9_900, lat_e7=0), _pos(10_100, lat_e7=10)] + + report = mse.evaluate_subcase(injection, statustexts, positions) + + assert report.position_drift_m < mse.POSITION_DRIFT_TOLERANCE_M + assert report.passes + + +def test_position_drift_returns_zero_when_no_pre_injection_sample() -> None: + # Only samples after the injection — the helper has no baseline + # so it returns 0 (the AC assertion still passes; the test author + # is expected to fail the test earlier if positions are missing). + drift = mse.position_drift_m([_pos(10_100)], around_ms=10_000) + + assert drift == 0.0 + + +def test_position_drift_returns_zero_when_no_post_injection_sample() -> None: + drift = mse.position_drift_m([_pos(9_900)], around_ms=10_000) + + assert drift == 0.0 + + +def test_rejection_before_injection_is_ignored() -> None: + injection = mse.InjectionEvent( + sub_case=mse.SubCase.WRONG_KEY, injected_at_ms=10_000 + ) + statustexts = [ + _st(9_500, "MAVLink: BAD_SIGNATURE"), # earlier — ignored + _st(10_400, "ok"), + ] + positions = [_pos(9_900), _pos(10_100)] + + report = mse.evaluate_subcase(injection, statustexts, positions) + + assert report.rejection_at_ms is None + assert not report.passes + + +def test_aggregate_passes_only_if_all_subcases_pass() -> None: + injections = [ + mse.InjectionEvent(mse.SubCase.UNSIGNED, injected_at_ms=10_000), + mse.InjectionEvent(mse.SubCase.WRONG_KEY, injected_at_ms=20_000), + mse.InjectionEvent(mse.SubCase.REPLAYED, injected_at_ms=30_000), + ] + statustexts = [ + _st(10_100, "MAVLink: BAD_SIGNATURE"), + _st(20_200, "Signature rejected"), + _st(30_300, "Bad signature received"), + ] + positions = [_pos(9_900), _pos(40_100)] + + report = mse.evaluate( + injections, statustexts=statustexts, positions=positions + ) + + assert report.passes + + +def test_aggregate_fails_when_one_subcase_fails() -> None: + injections = [ + mse.InjectionEvent(mse.SubCase.UNSIGNED, injected_at_ms=10_000), + mse.InjectionEvent(mse.SubCase.WRONG_KEY, injected_at_ms=20_000), # no rejection + ] + statustexts = [_st(10_100, "MAVLink: BAD_SIGNATURE")] + positions = [_pos(9_900), _pos(40_100)] + + report = mse.evaluate( + injections, statustexts=statustexts, positions=positions + ) + + assert not report.passes + [unsigned, wrong_key] = report.sub_cases + assert unsigned.passes + assert not wrong_key.passes + + +def test_csv_evidence_round_trip(tmp_path: Path) -> None: + injection = mse.InjectionEvent(mse.SubCase.UNSIGNED, injected_at_ms=10_000) + statustexts = [_st(10_200, "MAVLink: BAD_SIGNATURE")] + positions = [_pos(9_900), _pos(10_100, lat_e7=10)] + report = mse.evaluate( + [injection], statustexts=statustexts, positions=positions + ) + + out = tmp_path / "out.csv" + mse.write_csv_evidence(out, report) + + with out.open() as fh: + rows = list(csv.reader(fh)) + assert rows[0][0] == "sub_case" + assert rows[1][0] == "unsigned" + assert rows[1][2] == "200" # latency + assert rows[1][-1] == "true" diff --git a/e2e/_unit_tests/test_directory_layout.py b/e2e/_unit_tests/test_directory_layout.py index ecfac42..4d0b207 100644 --- a/e2e/_unit_tests/test_directory_layout.py +++ b/e2e/_unit_tests/test_directory_layout.py @@ -71,6 +71,11 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "runner/helpers/companion_reboot_evaluator.py", "runner/helpers/monte_carlo_envelope_evaluator.py", "runner/helpers/escalation_ladder_evaluator.py", + "runner/helpers/cache_poisoning_evaluator.py", + "runner/helpers/egress_observer.py", + "runner/helpers/mavlink_signing_evaluator.py", + "runner/helpers/cve_probe_evaluator.py", + "runner/helpers/asan_fuzz_evaluator.py", "fixtures/sitl_replay_builder/__init__.py", "fixtures/sitl_replay_builder/builder.py", "fixtures/sitl_replay_builder/build_p01_fixtures.py", @@ -141,6 +146,12 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "tests/resilience/test_nft_res_02_companion_reboot.py", "tests/resilience/test_nft_res_03_monte_carlo.py", "tests/resilience/test_nft_res_04_blackout_escalation.py", + "tests/security/test_nft_sec_01_cache_poisoning.py", + "tests/security/test_nft_sec_02_no_egress.py", + "tests/security/test_nft_sec_03_mavlink_signing.py", + "tests/security/test_nft_sec_04_opencv_cve.py", + "tests/security/test_nft_sec_04_asan_fuzz.py", + "tests/security/test_nft_sec_05_dns_blackhole.py", ], ) def test_required_path_exists(relative_path: str) -> None: diff --git a/e2e/runner/helpers/asan_fuzz_evaluator.py b/e2e/runner/helpers/asan_fuzz_evaluator.py new file mode 100644 index 0000000..44cbb23 --- /dev/null +++ b/e2e/runner/helpers/asan_fuzz_evaluator.py @@ -0,0 +1,169 @@ +"""AddressSanitizer fuzz evaluator for NFT-SEC-04 (AZ-439 / RESTRICT-CVE-1 release-gate). + +Companion to ``cve_probe_evaluator``: while the probe asserts a single +crafted JPEG does not crash the SUT, the fuzz scenario runs the +``build_kind=ASan`` SUT image under random JPEG inputs for ≥4 h and +asserts: + +* AC-2: 0 ASan findings (``heap-buffer-overflow``, ``use-after-free``, + ``stack-buffer-overflow``, ``heap-use-after-free``, etc.) in the + captured stderr / ASan log; +* AC-3 (informational only — no hard threshold): the harness reached + ≥``MIN_CORPUS_COVERAGE`` unique JPEG inputs. + +ASan-finding categories follow the canonical sanitizer wording. The +classifier matches a curated, non-exhaustive set; an *unknown* match +is bucketed into ``OTHER_FINDING`` and still fails AC-2. Unknown +findings are surfaced in the CSV evidence so a regression triage knows +to extend the canonical set. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +import re +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Sequence + +MIN_FUZZ_DURATION_SECONDS = 4 * 3600 # AC-2 — release-gate minimum +MIN_CORPUS_COVERAGE = 1000 # AC-3 — informational only + + +class AsanFindingCategory(str, Enum): + HEAP_BUFFER_OVERFLOW = "heap-buffer-overflow" + HEAP_USE_AFTER_FREE = "heap-use-after-free" + STACK_BUFFER_OVERFLOW = "stack-buffer-overflow" + STACK_USE_AFTER_RETURN = "stack-use-after-return" + GLOBAL_BUFFER_OVERFLOW = "global-buffer-overflow" + USE_AFTER_FREE = "use-after-free" + DOUBLE_FREE = "double-free" + OTHER_FINDING = "other-finding" # canonical unknown ASan match + + +# Each entry is (regex, category). Matched in order — first hit wins. +_KNOWN_PATTERNS: tuple[tuple[str, AsanFindingCategory], ...] = ( + (r"ERROR: AddressSanitizer:\s*heap-buffer-overflow", AsanFindingCategory.HEAP_BUFFER_OVERFLOW), + (r"ERROR: AddressSanitizer:\s*heap-use-after-free", AsanFindingCategory.HEAP_USE_AFTER_FREE), + (r"ERROR: AddressSanitizer:\s*stack-buffer-overflow", AsanFindingCategory.STACK_BUFFER_OVERFLOW), + (r"ERROR: AddressSanitizer:\s*stack-use-after-return", AsanFindingCategory.STACK_USE_AFTER_RETURN), + (r"ERROR: AddressSanitizer:\s*global-buffer-overflow", AsanFindingCategory.GLOBAL_BUFFER_OVERFLOW), + (r"ERROR: AddressSanitizer:\s*use-after-free", AsanFindingCategory.USE_AFTER_FREE), + (r"ERROR: AddressSanitizer:\s*double-free", AsanFindingCategory.DOUBLE_FREE), +) +_KNOWN_COMPILED = tuple((re.compile(pat), cat) for pat, cat in _KNOWN_PATTERNS) +_ANY_ASAN_RE = re.compile(r"ERROR: AddressSanitizer:") + + +def classify_asan_line(line: str) -> AsanFindingCategory | None: + """Classify one stderr line. Returns ``None`` if it's not an ASan finding.""" + for regex, category in _KNOWN_COMPILED: + if regex.search(line): + return category + if _ANY_ASAN_RE.search(line): + return AsanFindingCategory.OTHER_FINDING + return None + + +@dataclass(frozen=True) +class AsanFinding: + """One classified finding (one line OR one synthesized event).""" + + category: AsanFindingCategory + snippet: str # the matched line; truncated to ≤200 chars in evidence + + +@dataclass(frozen=True) +class AsanFuzzReport: + """Aggregate verdict for one ≥4 h fuzz run.""" + + duration_seconds: float + corpus_size: int + findings: Sequence[AsanFinding] + + @property + def passes_duration(self) -> bool: + return self.duration_seconds >= MIN_FUZZ_DURATION_SECONDS + + @property + def passes_findings(self) -> bool: + return len(self.findings) == 0 + + @property + def reached_corpus_floor(self) -> bool: + # Informational only — does NOT contribute to ``passes``. + return self.corpus_size >= MIN_CORPUS_COVERAGE + + @property + def passes(self) -> bool: + return self.passes_duration and self.passes_findings + + +def evaluate( + asan_log_lines: Sequence[str], + *, + duration_seconds: float, + corpus_size: int, +) -> AsanFuzzReport: + """Scan the ASan log, classify findings, and assemble the report.""" + findings: list[AsanFinding] = [] + for line in asan_log_lines: + category = classify_asan_line(line) + if category is not None: + findings.append( + AsanFinding( + category=category, + snippet=line.strip()[:200], + ) + ) + return AsanFuzzReport( + duration_seconds=duration_seconds, + corpus_size=corpus_size, + findings=tuple(findings), + ) + + +def write_csv_evidence(out_path: Path, report: AsanFuzzReport) -> Path: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "duration_seconds", + "passes_duration", + "corpus_size", + "reached_corpus_floor", + "finding_count", + "passes_findings", + "passes", + "finding_breakdown", + ] + ) + breakdown: dict[str, int] = {} + for f in report.findings: + breakdown[f.category.value] = breakdown.get(f.category.value, 0) + 1 + breakdown_str = ";".join( + f"{cat}={count}" for cat, count in sorted(breakdown.items()) + ) + writer.writerow( + [ + f"{report.duration_seconds:.0f}", + "true" if report.passes_duration else "false", + report.corpus_size, + "true" if report.reached_corpus_floor else "false", + len(report.findings), + "true" if report.passes_findings else "false", + "true" if report.passes else "false", + breakdown_str, + ] + ) + if report.findings: + writer.writerow([]) + writer.writerow(["finding_index", "category", "snippet"]) + for idx, f in enumerate(report.findings): + writer.writerow([idx, f.category.value, f.snippet]) + return out_path diff --git a/e2e/runner/helpers/cache_poisoning_evaluator.py b/e2e/runner/helpers/cache_poisoning_evaluator.py new file mode 100644 index 0000000..fc10fd2 --- /dev/null +++ b/e2e/runner/helpers/cache_poisoning_evaluator.py @@ -0,0 +1,238 @@ +"""Cache-poisoning safety-budget evaluator for NFT-SEC-01 (AZ-436 / AC-NEW-9). + +The contract: across ``N`` synthetic flights — each carrying ``1-5 %`` of +*poisoned* tiles (signing-cert mismatch, freshness violation, or voting +disagreement) — the SUT MUST NOT silently emit a ``satellite_anchored`` +estimate that traces back to a poisoned tile (a *false-trust event*). + +Aggregate budget (Mode B Fact #103): + + total_false_trust_events <= N * 1e-6 + +At default CI N=1000 the budget is 0.001 expected events; the test +therefore enforces the strict ``count == 0`` zero-tolerance default. The +``release-gate`` N=10000 run keeps the same zero-tolerance default (the +budget allows 0.01 events; one event is already a regression). + +Per-flight invariants additionally guarded by this evaluator: + +* poison ratio in ``[POISON_RATIO_MIN, POISON_RATIO_MAX]`` (AC-2); +* at least one poisoned tile per **defense layer** in every flight + (AC-2 — each layer must be exercised so a per-layer regression cannot + hide behind layer-confusion); +* per-event ``rejection_reason`` is one of the documented categories + (so an unlabeled false-positive cannot pass as a "rejection" in CSV + evidence). + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. The evaluator only consumes +runner-collected counter objects. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass, field +from pathlib import Path +from typing import Sequence + +FALSE_TRUST_BUDGET_PER_FLIGHT = 1e-6 +POISON_RATIO_MIN = 0.01 +POISON_RATIO_MAX = 0.05 + +DEFENSE_LAYER_SIGNING = "signing_cert_mismatch" +DEFENSE_LAYER_FRESHNESS = "freshness_violation" +DEFENSE_LAYER_VOTING = "voting_disagreement" + +DEFENSE_LAYERS: tuple[str, ...] = ( + DEFENSE_LAYER_SIGNING, + DEFENSE_LAYER_FRESHNESS, + DEFENSE_LAYER_VOTING, +) + +REJECTION_REASONS: frozenset[str] = frozenset( + { + DEFENSE_LAYER_SIGNING, + DEFENSE_LAYER_FRESHNESS, + DEFENSE_LAYER_VOTING, + "freshness_gate_downgrade", + "service_voting_downgrade", + } +) + + +@dataclass(frozen=True) +class PoisonedTileSpec: + """One poisoned tile slotted into a flight's tile cache.""" + + tile_id: str + defense_layer: str # MUST be one of ``DEFENSE_LAYERS`` + + +@dataclass(frozen=True) +class FalseTrustEvent: + """A frame where the SUT emitted ``satellite_anchored`` traced to a poisoned tile.""" + + flight_id: str + tile_id: str + monotonic_ms: int + defense_layer: str + + +@dataclass(frozen=True) +class FlightOutcome: + """One synthetic-flight result. + + ``total_tile_count`` is the **cache size for that flight** (used to + compute the poison ratio). ``poisoned_tiles`` is the slate of + crafted tiles injected. ``false_trust_events`` are the runner-observed + frames where the SUT trusted a poisoned tile. + + ``rejection_reasons`` is a counter of how often each documented + rejection-reason fired, taken from the runner's outbound + ``source_label`` capture + FDR signing-rejection events. Used in + evidence only (no AC assertion); a flight with zero rejection + events is suspicious but not necessarily a failure (the SUT may + have downgraded the candidate without naming the cause). + """ + + flight_id: str + total_tile_count: int + poisoned_tiles: Sequence[PoisonedTileSpec] + false_trust_events: Sequence[FalseTrustEvent] + rejection_reasons: dict[str, int] = field(default_factory=dict) + + @property + def poison_ratio(self) -> float: + if self.total_tile_count <= 0: + return 0.0 + return len(self.poisoned_tiles) / self.total_tile_count + + @property + def defense_layers_present(self) -> set[str]: + return {p.defense_layer for p in self.poisoned_tiles} + + @property + def false_trust_count(self) -> int: + return len(self.false_trust_events) + + def has_unknown_rejection_reasons(self) -> bool: + return any(r not in REJECTION_REASONS for r in self.rejection_reasons) + + +@dataclass(frozen=True) +class CachePoisoningReport: + """Aggregate verdict over N flights.""" + + flights: Sequence[FlightOutcome] + flight_count: int + total_false_trust: int + flights_with_bad_poison_ratio: Sequence[str] + flights_missing_defense_layers: Sequence[tuple[str, list[str]]] + flights_with_unknown_rejection_reasons: Sequence[str] + + @property + def budget(self) -> float: + return self.flight_count * FALSE_TRUST_BUDGET_PER_FLIGHT + + @property + def passes_budget(self) -> bool: + # Zero-tolerance default: the budget at N=1000 is 0.001 expected + # events; one observed event is already a regression even on a + # purely floating-point reading of the budget. + return self.total_false_trust == 0 + + @property + def passes_ratio(self) -> bool: + return len(self.flights_with_bad_poison_ratio) == 0 + + @property + def passes_layer_coverage(self) -> bool: + return len(self.flights_missing_defense_layers) == 0 + + @property + def passes_rejection_reason_vocabulary(self) -> bool: + return len(self.flights_with_unknown_rejection_reasons) == 0 + + @property + def passes(self) -> bool: + return ( + self.passes_budget + and self.passes_ratio + and self.passes_layer_coverage + and self.passes_rejection_reason_vocabulary + ) + + +def evaluate(flights: Sequence[FlightOutcome]) -> CachePoisoningReport: + """Compute the aggregate AC-1..AC-3 verdict for one Monte Carlo run.""" + bad_ratio: list[str] = [] + missing_layers: list[tuple[str, list[str]]] = [] + unknown_reasons: list[str] = [] + total_false_trust = 0 + for flight in flights: + total_false_trust += flight.false_trust_count + ratio = flight.poison_ratio + if not (POISON_RATIO_MIN <= ratio <= POISON_RATIO_MAX): + bad_ratio.append(flight.flight_id) + missing = sorted(set(DEFENSE_LAYERS) - flight.defense_layers_present) + if missing: + missing_layers.append((flight.flight_id, missing)) + if flight.has_unknown_rejection_reasons(): + unknown_reasons.append(flight.flight_id) + return CachePoisoningReport( + flights=tuple(flights), + flight_count=len(flights), + total_false_trust=total_false_trust, + flights_with_bad_poison_ratio=tuple(bad_ratio), + flights_missing_defense_layers=tuple(missing_layers), + flights_with_unknown_rejection_reasons=tuple(unknown_reasons), + ) + + +def write_csv_evidence(out_path: Path, report: CachePoisoningReport) -> Path: + """Per-flight CSV — one row per flight + an aggregate footer row.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "flight_id", + "total_tile_count", + "poisoned_tile_count", + "poison_ratio", + "defense_layers_present", + "false_trust_count", + "rejection_reason_breakdown", + ] + ) + for flight in report.flights: + layers_present = ",".join(sorted(flight.defense_layers_present)) or "" + breakdown = ";".join( + f"{reason}={count}" + for reason, count in sorted(flight.rejection_reasons.items()) + ) + writer.writerow( + [ + flight.flight_id, + flight.total_tile_count, + len(flight.poisoned_tiles), + f"{flight.poison_ratio:.4f}", + layers_present, + flight.false_trust_count, + breakdown, + ] + ) + writer.writerow([]) + writer.writerow( + [ + "AGGREGATE", + f"flight_count={report.flight_count}", + f"total_false_trust={report.total_false_trust}", + f"budget={report.budget:g}", + f"passes_budget={'true' if report.passes_budget else 'false'}", + f"passes_ratio={'true' if report.passes_ratio else 'false'}", + f"passes_layer_coverage={'true' if report.passes_layer_coverage else 'false'}", + ] + ) + return out_path diff --git a/e2e/runner/helpers/cve_probe_evaluator.py b/e2e/runner/helpers/cve_probe_evaluator.py new file mode 100644 index 0000000..7acc4bf --- /dev/null +++ b/e2e/runner/helpers/cve_probe_evaluator.py @@ -0,0 +1,143 @@ +"""OpenCV CVE-2025-53644 probe evaluator for NFT-SEC-04 (AZ-439 / RESTRICT-CVE-1). + +The probe feeds the canonical ``cve-jpeg-fixture`` (a single crafted +JPEG known to trigger the upstream OpenCV vulnerability) into the SUT's +nav-camera input and asserts: + +* AC-1a: the SUT process remains **alive** after the frame (no crash, + no abort, no segfault — the FDR archive contains AT LEAST one record + whose monotonic_ms strictly post-dates the probe); +* AC-1b: the FDR contains a deterministic outcome for the probe frame + — either a ``decode-success`` record (the patched OpenCV decoded + the crafted JPEG) OR a ``frame-decode-error`` record (the SUT + gracefully rejected the malformed payload). A silent drop (no FDR + record at all) is a failure, because it means the SUT swallowed the + frame without honoring its error-reporting contract. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. The evaluator consumes only the +runner-collected ``FdrSurvivalRecord`` summaries the FDR-reader helper +already produces for other scenarios. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Sequence + + +class ProbeFrameOutcome(str, Enum): + """The deterministic per-frame outcome the SUT must record.""" + + DECODE_SUCCESS = "decode-success" + FRAME_DECODE_ERROR = "frame-decode-error" + MISSING = "missing" # silent drop — fails AC-1b + + +@dataclass(frozen=True) +class FdrSurvivalRecord: + """One FDR record from the runner-collected archive.""" + + monotonic_ms: int + kind: str # e.g. "frame-decode-success" or "frame-decode-error" + + +@dataclass(frozen=True) +class CveProbeReport: + """Verdict for one ``cve-jpeg-fixture`` injection.""" + + probe_injected_at_ms: int + last_fdr_record_at_ms: int | None + probe_outcome: ProbeFrameOutcome + + @property + def passes_no_crash(self) -> bool: + return ( + self.last_fdr_record_at_ms is not None + and self.last_fdr_record_at_ms >= self.probe_injected_at_ms + ) + + @property + def passes_graceful_outcome(self) -> bool: + return self.probe_outcome in ( + ProbeFrameOutcome.DECODE_SUCCESS, + ProbeFrameOutcome.FRAME_DECODE_ERROR, + ) + + @property + def passes(self) -> bool: + return self.passes_no_crash and self.passes_graceful_outcome + + +def classify_probe_outcome( + fdr_records: Sequence[FdrSurvivalRecord], + *, + probe_injected_at_ms: int, + tolerance_ms: int = 50, +) -> ProbeFrameOutcome: + """Pick the FDR record nearest the probe injection and classify it. + + A record is considered ``for the probe`` if its monotonic timestamp + lies within ``[probe_injected_at_ms, probe_injected_at_ms + tolerance_ms]``. + If no record falls in that window the outcome is ``MISSING`` — + which fails AC-1b regardless of the no-crash check. + """ + for record in fdr_records: + if record.monotonic_ms < probe_injected_at_ms: + continue + if record.monotonic_ms > probe_injected_at_ms + tolerance_ms: + continue + if "decode-success" in record.kind: + return ProbeFrameOutcome.DECODE_SUCCESS + if "decode-error" in record.kind: + return ProbeFrameOutcome.FRAME_DECODE_ERROR + return ProbeFrameOutcome.MISSING + + +def evaluate( + fdr_records: Sequence[FdrSurvivalRecord], + *, + probe_injected_at_ms: int, + tolerance_ms: int = 50, +) -> CveProbeReport: + last_record_at = max((r.monotonic_ms for r in fdr_records), default=None) + outcome = classify_probe_outcome( + fdr_records, + probe_injected_at_ms=probe_injected_at_ms, + tolerance_ms=tolerance_ms, + ) + return CveProbeReport( + probe_injected_at_ms=probe_injected_at_ms, + last_fdr_record_at_ms=last_record_at, + probe_outcome=outcome, + ) + + +def write_csv_evidence(out_path: Path, report: CveProbeReport) -> Path: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "probe_injected_at_ms", + "last_fdr_record_at_ms", + "probe_outcome", + "passes_no_crash", + "passes_graceful_outcome", + "passes", + ] + ) + writer.writerow( + [ + report.probe_injected_at_ms, + "" if report.last_fdr_record_at_ms is None else report.last_fdr_record_at_ms, + report.probe_outcome.value, + "true" if report.passes_no_crash else "false", + "true" if report.passes_graceful_outcome else "false", + "true" if report.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/egress_observer.py b/e2e/runner/helpers/egress_observer.py new file mode 100644 index 0000000..3cd56e7 --- /dev/null +++ b/e2e/runner/helpers/egress_observer.py @@ -0,0 +1,219 @@ +"""Egress-observation evaluator shared by NFT-SEC-02 and NFT-SEC-05 (AZ-437). + +Both scenarios verify the same invariant: **no packets leave the e2e +Docker network** from the SUT container. NFT-SEC-02 reads the Docker +network-stats counter over a 5-min Derkachi replay. NFT-SEC-05 runs a +``nslookup`` probe inside the SUT container's network namespace and +checks that (a) the lookup fails and (b) no UDP-53 packets escape the +host's outbound interface during the probe. + +The observation pattern is identical in both cases: take a *before* +counter snapshot, run the workload, take an *after* snapshot, assert +``after - before == 0`` for the relevant counter family. The runner is +responsible for the actual ``docker network inspect`` / ``ip -s link`` +collection; this helper only performs the delta + verdict logic so the +scenario code stays tight and the verdict logic is unit-testable in +isolation. + +DNS-resolution outcome categories follow the spec's wording (NXDOMAIN, +timeout, "no servers can be reached") + a generic "other failure" bucket +for resolver implementations that emit a different string but still +fail. A *success* outcome — i.e. an actual A record returned — is the +only failing case. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from enum import Enum +from pathlib import Path + + +class DnsLookupOutcome(str, Enum): + """The runner-classified outcome of a ``nslookup`` probe.""" + + NXDOMAIN = "nxdomain" + TIMEOUT = "timeout" + NO_SERVERS = "no_servers_can_be_reached" + OTHER_FAILURE = "other_failure" + SUCCESS = "success" # the only outcome that fails the AC + + +FAILING_DNS_OUTCOMES: frozenset[DnsLookupOutcome] = frozenset( + { + DnsLookupOutcome.NXDOMAIN, + DnsLookupOutcome.TIMEOUT, + DnsLookupOutcome.NO_SERVERS, + DnsLookupOutcome.OTHER_FAILURE, + } +) + + +@dataclass(frozen=True) +class EgressCounterSnapshot: + """One snapshot of egress-byte / packet counters on the SUT-facing interface.""" + + egress_packets_to_internal_net: int + egress_packets_to_other_destinations: int + udp53_egress_packets: int + + def __post_init__(self) -> None: + for field_name in ( + "egress_packets_to_internal_net", + "egress_packets_to_other_destinations", + "udp53_egress_packets", + ): + value = getattr(self, field_name) + if value < 0: + raise ValueError( + f"egress counter {field_name!r} cannot be negative; got {value}" + ) + + +@dataclass(frozen=True) +class NoEgressReport: + """NFT-SEC-02 verdict — zero packets to non-internal destinations during the window.""" + + before: EgressCounterSnapshot + after: EgressCounterSnapshot + window_label: str # e.g. "5min-derkachi-replay" + + @property + def delta_other_destinations(self) -> int: + return ( + self.after.egress_packets_to_other_destinations + - self.before.egress_packets_to_other_destinations + ) + + @property + def delta_internal(self) -> int: + return ( + self.after.egress_packets_to_internal_net + - self.before.egress_packets_to_internal_net + ) + + @property + def passes(self) -> bool: + return self.delta_other_destinations == 0 + + +@dataclass(frozen=True) +class DnsBlackholeReport: + """NFT-SEC-05 verdict — lookup fails AND no UDP-53 packets escape.""" + + before: EgressCounterSnapshot + after: EgressCounterSnapshot + lookup_outcome: DnsLookupOutcome + sidecar_healthy: bool + + @property + def delta_udp53(self) -> int: + return self.after.udp53_egress_packets - self.before.udp53_egress_packets + + @property + def passes_lookup(self) -> bool: + return self.lookup_outcome in FAILING_DNS_OUTCOMES + + @property + def passes_udp_silence(self) -> bool: + return self.delta_udp53 == 0 + + @property + def passes(self) -> bool: + return ( + self.sidecar_healthy and self.passes_lookup and self.passes_udp_silence + ) + + +def evaluate_no_egress( + before: EgressCounterSnapshot, + after: EgressCounterSnapshot, + *, + window_label: str, +) -> NoEgressReport: + """AC-1 verdict for NFT-SEC-02.""" + return NoEgressReport(before=before, after=after, window_label=window_label) + + +def evaluate_dns_blackhole( + before: EgressCounterSnapshot, + after: EgressCounterSnapshot, + *, + lookup_outcome: DnsLookupOutcome, + sidecar_healthy: bool, +) -> DnsBlackholeReport: + """AC-2 + AC-3 verdict for NFT-SEC-05.""" + return DnsBlackholeReport( + before=before, + after=after, + lookup_outcome=lookup_outcome, + sidecar_healthy=sidecar_healthy, + ) + + +def write_no_egress_csv_evidence(out_path: Path, report: NoEgressReport) -> Path: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "window_label", + "before_other", + "after_other", + "delta_other", + "before_internal", + "after_internal", + "delta_internal", + "passes", + ] + ) + writer.writerow( + [ + report.window_label, + report.before.egress_packets_to_other_destinations, + report.after.egress_packets_to_other_destinations, + report.delta_other_destinations, + report.before.egress_packets_to_internal_net, + report.after.egress_packets_to_internal_net, + report.delta_internal, + "true" if report.passes else "false", + ] + ) + return out_path + + +def write_dns_blackhole_csv_evidence( + out_path: Path, report: DnsBlackholeReport +) -> Path: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "sidecar_healthy", + "lookup_outcome", + "passes_lookup", + "before_udp53", + "after_udp53", + "delta_udp53", + "passes_udp_silence", + "passes", + ] + ) + writer.writerow( + [ + "true" if report.sidecar_healthy else "false", + report.lookup_outcome.value, + "true" if report.passes_lookup else "false", + report.before.udp53_egress_packets, + report.after.udp53_egress_packets, + report.delta_udp53, + "true" if report.passes_udp_silence else "false", + "true" if report.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/mavlink_signing_evaluator.py b/e2e/runner/helpers/mavlink_signing_evaluator.py new file mode 100644 index 0000000..e089d72 --- /dev/null +++ b/e2e/runner/helpers/mavlink_signing_evaluator.py @@ -0,0 +1,217 @@ +"""MAVLink 2.0 signing-rejection evaluator for NFT-SEC-03 (AZ-438 / AC-NEW-11, D-C8-9). + +For each of the three injection sub-cases — unsigned, signed-with-wrong-key, +replayed-from-tlog — AP MUST: + +* emit a ``BAD_SIGNATURE`` STATUSTEXT within ≤``REJECTION_LATENCY_MS`` + (500 ms) of the injected message; +* NOT update its ``GLOBAL_POSITION_INT`` from the injected message + (i.e. the GPS position remains anchored to whatever the last legitimate + emission established). + +The "rejection STATUSTEXT" regex matches the canonical AP wording (e.g. +``MAVLink: BAD_SIGNATURE``) plus an "equivalent" wildcard that the +spec carves out for AP variants that emit a slightly different phrase +("Bad signature received", "signature rejected", etc.) so this evaluator +does not lock to one exact build. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +import re +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Sequence + +REJECTION_LATENCY_MS = 500 +POSITION_DRIFT_TOLERANCE_M = 1.0 + + +class SubCase(str, Enum): + UNSIGNED = "unsigned" + WRONG_KEY = "wrong_key" + REPLAYED = "replayed" + + +# Canonical wording + variants observed across AP builds. Matched +# case-insensitively. Extending this set is a deliberate decision; the +# regression risk is that a future AP build emits a brand-new phrase +# and the runner silently treats injections as accepted — guard against +# that by surfacing the seen STATUSTEXTs in the CSV evidence row. +BAD_SIGNATURE_PATTERNS: tuple[str, ...] = ( + r"\bBAD[_\s]?SIGNATURE\b", + r"\bsignature\s+rejected\b", + r"\bbad\s+signature\s+received\b", +) +_BAD_SIGNATURE_RE = re.compile("|".join(BAD_SIGNATURE_PATTERNS), re.IGNORECASE) + + +def is_bad_signature_statustext(text: str) -> bool: + """True iff the STATUSTEXT line matches one of the documented rejections.""" + return bool(_BAD_SIGNATURE_RE.search(text)) + + +@dataclass(frozen=True) +class InjectionEvent: + """One runner-issued injection in a sub-case.""" + + sub_case: SubCase + injected_at_ms: int + + +@dataclass(frozen=True) +class StatustextSample: + monotonic_ms: int + text: str + + +@dataclass(frozen=True) +class PositionSample: + """AP ``GLOBAL_POSITION_INT`` sample, decoded to meters.""" + + monotonic_ms: int + lat_e7: int + lon_e7: int + + +def position_drift_m(samples: Sequence[PositionSample], around_ms: int) -> float: + """Equirectangular drift (m) between the last sample before and first after ``around_ms``. + + A tiny budget (1 m by default) tolerates the per-frame jitter the + autopilot's own EKF produces; the absolute test is that the drift + is NOT on the order of the injected message's lat/lon magnitude + (which would be several-meters to kilometers). + """ + before: PositionSample | None = None + after: PositionSample | None = None + for s in samples: + if s.monotonic_ms <= around_ms: + before = s if before is None or s.monotonic_ms > before.monotonic_ms else before + elif after is None: + after = s + break + if before is None or after is None: + return 0.0 + dlat_m = (after.lat_e7 - before.lat_e7) * 1e-7 * 111_320.0 + avg_lat_rad = ((after.lat_e7 + before.lat_e7) / 2.0) * 1e-7 * (3.14159265358979 / 180.0) + import math + + dlon_m = (after.lon_e7 - before.lon_e7) * 1e-7 * 111_320.0 * math.cos(avg_lat_rad) + return math.hypot(dlat_m, dlon_m) + + +@dataclass(frozen=True) +class SubCaseRejectionReport: + """One sub-case verdict (AC-2 / AC-3 / AC-4).""" + + sub_case: SubCase + rejection_at_ms: int | None + rejection_text: str | None + rejection_latency_ms: int | None + position_drift_m: float + budget_ms: int = REJECTION_LATENCY_MS + + @property + def passes_rejection(self) -> bool: + return ( + self.rejection_at_ms is not None + and self.rejection_latency_ms is not None + and self.rejection_latency_ms <= self.budget_ms + ) + + @property + def passes_no_position_update(self) -> bool: + return self.position_drift_m <= POSITION_DRIFT_TOLERANCE_M + + @property + def passes(self) -> bool: + return self.passes_rejection and self.passes_no_position_update + + +@dataclass(frozen=True) +class SigningRejectionReport: + """Aggregate AC-2 + AC-3 + AC-4 verdict across all sub-cases.""" + + sub_cases: Sequence[SubCaseRejectionReport] + + @property + def passes(self) -> bool: + return all(sc.passes for sc in self.sub_cases) + + +def evaluate_subcase( + injection: InjectionEvent, + statustexts: Sequence[StatustextSample], + positions: Sequence[PositionSample], +) -> SubCaseRejectionReport: + """Compute verdict for one (injection, capture) pair.""" + rejection_at: int | None = None + rejection_text: str | None = None + rejection_latency: int | None = None + for st in statustexts: + if st.monotonic_ms < injection.injected_at_ms: + continue + if is_bad_signature_statustext(st.text): + rejection_at = st.monotonic_ms + rejection_text = st.text + rejection_latency = st.monotonic_ms - injection.injected_at_ms + break + drift = position_drift_m(positions, injection.injected_at_ms) + return SubCaseRejectionReport( + sub_case=injection.sub_case, + rejection_at_ms=rejection_at, + rejection_text=rejection_text, + rejection_latency_ms=rejection_latency, + position_drift_m=drift, + ) + + +def evaluate( + injections: Sequence[InjectionEvent], + *, + statustexts: Sequence[StatustextSample], + positions: Sequence[PositionSample], +) -> SigningRejectionReport: + sub_reports: list[SubCaseRejectionReport] = [] + for inj in injections: + sub_reports.append( + evaluate_subcase(inj, statustexts=statustexts, positions=positions) + ) + return SigningRejectionReport(sub_cases=tuple(sub_reports)) + + +def write_csv_evidence(out_path: Path, report: SigningRejectionReport) -> Path: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "sub_case", + "rejection_at_ms", + "rejection_latency_ms", + "rejection_text", + "position_drift_m", + "passes_rejection", + "passes_no_position_update", + "passes", + ] + ) + for sc in report.sub_cases: + writer.writerow( + [ + sc.sub_case.value, + "" if sc.rejection_at_ms is None else sc.rejection_at_ms, + "" if sc.rejection_latency_ms is None else sc.rejection_latency_ms, + sc.rejection_text or "", + f"{sc.position_drift_m:.4f}", + "true" if sc.passes_rejection else "false", + "true" if sc.passes_no_position_update else "false", + "true" if sc.passes else "false", + ] + ) + return out_path diff --git a/e2e/tests/security/test_nft_sec_01_cache_poisoning.py b/e2e/tests/security/test_nft_sec_01_cache_poisoning.py new file mode 100644 index 0000000..2170a9b --- /dev/null +++ b/e2e/tests/security/test_nft_sec_01_cache_poisoning.py @@ -0,0 +1,237 @@ +"""NFT-SEC-01 — Cache-poisoning safety probability ≤ 1e-6 / flight (AZ-436 / AC-NEW-9). + +Tier-1 OR Tier-2. ``N`` synthetic micro-flights (~60 s each) carry ``[1 %, +5 %]`` of crafted poisoned tiles (signing-cert mismatch, freshness +violation, voting disagreement). The SUT MUST reject or downgrade them +via three independent defense layers and emit zero false-trust events. + +Default CI runs N=1000 with a single canonical parameterization to keep +total runtime bounded; the full release-gate run is N=10000 across +``(fc_adapter × vio_strategy)`` and is gated behind +``E2E_NFT_SEC_01_RELEASE_GATE=1``. + +Production dependencies surfaced to the cumulative review window: + +* **AZ-595**: emit ``nft_sec_01_cache_poisoning.json`` containing + per-flight tile-cache slates + runner-collected false-trust events + + per-flight ``rejection_reasons`` counter — see fixture JSON shape + in the docstring of ``_parse_payload``. +* **SUT**: outbound ``source_label`` MUST carry the ``tile_id`` so the + runner can match a ``satellite_anchored`` frame back to a poisoned + tile; otherwise false-trust events cannot be detected reliably. + +Pure aggregate-budget logic is fully covered by +``e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py``; the +scenario test only validates the fixture parser, the AC assertions, and +the conftest skip-rules. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import cache_poisoning_evaluator as cpe + +NFT_SEC_01_FIXTURE_ENV_VAR = "E2E_NFT_SEC_01_FIXTURE" +NFT_SEC_01_DEFAULT_FIXTURE_NAME = "nft_sec_01_cache_poisoning.json" +NFT_SEC_01_RELEASE_GATE_ENV_VAR = "E2E_NFT_SEC_01_RELEASE_GATE" +NFT_SEC_01_CI_MIN_FLIGHTS = 1000 + + +@pytest.mark.scenario_id("nft-sec-01") +@pytest.mark.traces_to("AC-NEW-9,AC-1,AC-2,AC-3,AC-4") +def test_nft_sec_01_cache_poisoning( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """Aggregate false-trust count ≤ N × 1e-6 (zero-tolerance default).""" + release_gate = _release_gate_enabled() + if not release_gate and not _is_canonical_param(fc_adapter, vio_strategy): + pytest.skip( + "NFT-SEC-01 default CI run uses a single canonical " + "parameterization (ardupilot, okvis2) to keep N=1000 × 4 " + "Monte Carlo cost bounded. Set " + f"`{NFT_SEC_01_RELEASE_GATE_ENV_VAR}=1` for the full matrix." + ) + if not sitl_replay_ready: + pytest.skip( + "NFT-SEC-01 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying the N " + "synthetic flights with crafted poisoned tiles. Pure " + "aggregate-budget logic covered by " + "e2e/_unit_tests/helpers/test_cache_poisoning_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-SEC-01: fixture not found at {fixture_path}. " + f"`{NFT_SEC_01_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + flights = _parse_payload(payload, fixture_path) + if len(flights) < NFT_SEC_01_CI_MIN_FLIGHTS and not release_gate: + pytest.fail( + f"NFT-SEC-01 AC-1: fixture provides only {len(flights)} flights " + f"but the CI default requires ≥{NFT_SEC_01_CI_MIN_FLIGHTS}. " + f"Set `{NFT_SEC_01_RELEASE_GATE_ENV_VAR}=1` to allow shorter runs " + "for debugging." + ) + + report = cpe.evaluate(flights) + out_csv = ( + evidence_dir + / "nft-sec-01" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + cpe.write_csv_evidence(out_csv, report) + + nfr_recorder.record_metric( + "nft_sec_01.flight_count", + float(report.flight_count), + ac_id="AC-1", + ) + nfr_recorder.record_metric( + "nft_sec_01.total_false_trust", + float(report.total_false_trust), + ac_id="AC-3", + ) + nfr_recorder.record_metric( + "nft_sec_01.budget", + report.budget, + ac_id="AC-3", + ) + + assert report.passes_ratio, ( + "AC-2: poison ratio outside [1%, 5%] in flights: " + f"{list(report.flights_with_bad_poison_ratio)[:10]}" + ) + assert report.passes_layer_coverage, ( + "AC-2: at least one defense layer absent from flight: " + f"{list(report.flights_missing_defense_layers)[:10]}" + ) + assert report.passes_rejection_reason_vocabulary, ( + "AC-2 evidence: unknown rejection_reason vocabulary in flights: " + f"{list(report.flights_with_unknown_rejection_reasons)[:10]}" + ) + assert report.passes_budget, ( + f"AC-3: total_false_trust = {report.total_false_trust} " + f"(budget {report.budget:g} expected events at N={report.flight_count}; " + "zero-tolerance default — see Mode B Fact #103)." + ) + + +def _release_gate_enabled() -> bool: + return os.environ.get(NFT_SEC_01_RELEASE_GATE_ENV_VAR, "").strip().lower() in ( + "1", + "true", + "yes", + ) + + +def _is_canonical_param(fc_adapter: str, vio_strategy: str) -> bool: + return fc_adapter == "ardupilot" and vio_strategy == "okvis2" + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_SEC_01_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_SEC_01_FIXTURE_ENV_VAR}-unset>") + return root / NFT_SEC_01_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> list[cpe.FlightOutcome]: + """Parse the fixture into typed ``FlightOutcome`` records. + + Expected shape: + + { + "flights": [ + { + "flight_id": "", + "total_tile_count": , + "poisoned_tiles": [ + {"tile_id": "", "defense_layer": ""}, ... + ], + "false_trust_events": [ + {"flight_id": "", "tile_id": "", + "monotonic_ms": , "defense_layer": ""}, ... + ], + "rejection_reasons": {"": , ...} + }, ... + ] + } + """ + if not isinstance(payload, dict): + pytest.fail( + f"NFT-SEC-01: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + raw_flights = payload.get("flights") + if not isinstance(raw_flights, list): + pytest.fail( + f"NFT-SEC-01: fixture {fixture_path} 'flights' must be a list" + ) + flights: list[cpe.FlightOutcome] = [] + for idx, entry in enumerate(raw_flights): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-SEC-01: flights[{idx}] in {fixture_path} must be " + f"an object; got {type(entry).__name__}" + ) + try: + poisoned = tuple( + cpe.PoisonedTileSpec( + tile_id=str(p["tile_id"]), + defense_layer=str(p["defense_layer"]), + ) + for p in entry.get("poisoned_tiles", []) + ) + false_trust = tuple( + cpe.FalseTrustEvent( + flight_id=str(e.get("flight_id", entry["flight_id"])), + tile_id=str(e["tile_id"]), + monotonic_ms=int(e["monotonic_ms"]), + defense_layer=str(e["defense_layer"]), + ) + for e in entry.get("false_trust_events", []) + ) + rejection_reasons = { + str(k): int(v) + for k, v in (entry.get("rejection_reasons") or {}).items() + } + flights.append( + cpe.FlightOutcome( + flight_id=str(entry["flight_id"]), + total_tile_count=int(entry["total_tile_count"]), + poisoned_tiles=poisoned, + false_trust_events=false_trust, + rejection_reasons=rejection_reasons, + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-01: flights[{idx}] in {fixture_path} shape invalid: {exc}" + ) + return flights diff --git a/e2e/tests/security/test_nft_sec_02_no_egress.py b/e2e/tests/security/test_nft_sec_02_no_egress.py new file mode 100644 index 0000000..79e5c69 --- /dev/null +++ b/e2e/tests/security/test_nft_sec_02_no_egress.py @@ -0,0 +1,146 @@ +"""NFT-SEC-02 — No-egress contract (AZ-437 / AC-NEW-10). + +Tier-1 OR Tier-2. Over a 5-min Derkachi replay against +``e2e-net.internal: true``, ``docker network inspect e2e-net`` MUST show +zero packets from the SUT container to any non-``e2e-net`` destination. + +The egress-counter snapshot pair is sourced from the SITL replay +fixture (AZ-595) since the live ``docker network inspect`` call requires +a running e2e-runner container with Docker-API access — which only +exists inside the harness, not on the developer workstation. The +scenario test therefore behaves identically to the other fixture- +consumer NFTs: skip cleanly without fixtures; parse + verdict + record +when fixtures are present. + +Production dependency surfaced to AZ-595: fixture JSON shape + + { + "window_label": "", + "before": {"egress_packets_to_internal_net": , + "egress_packets_to_other_destinations": , + "udp53_egress_packets": }, + "after": {... same shape ...} + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import egress_observer as eo + +NFT_SEC_02_FIXTURE_ENV_VAR = "E2E_NFT_SEC_02_FIXTURE" +NFT_SEC_02_DEFAULT_FIXTURE_NAME = "nft_sec_02_no_egress.json" + + +@pytest.mark.scenario_id("nft-sec-02") +@pytest.mark.traces_to("AC-NEW-10,AC-1,AC-4") +def test_nft_sec_02_no_egress( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AC-1: 0 packets to non-e2e-net during the 5-min replay window.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-SEC-02 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying the Docker " + "network-stats before/after snapshots. Pure delta-verdict " + "logic covered by " + "e2e/_unit_tests/helpers/test_egress_observer.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-SEC-02: fixture not found at {fixture_path}. " + f"`{NFT_SEC_02_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + before, after, window_label = _parse_payload(payload, fixture_path) + report = eo.evaluate_no_egress(before, after, window_label=window_label) + out_csv = ( + evidence_dir + / "nft-sec-02" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + eo.write_no_egress_csv_evidence(out_csv, report) + + nfr_recorder.record_metric( + "nft_sec_02.egress_packets_to_other_destinations_delta", + float(report.delta_other_destinations), + ac_id="AC-1", + ) + nfr_recorder.record_metric( + "nft_sec_02.egress_packets_to_internal_net_delta", + float(report.delta_internal), + ac_id="AC-1", + ) + + assert report.passes, ( + f"AC-1: SUT container egressed {report.delta_other_destinations} " + f"packets to non-e2e-net destinations during window " + f"'{report.window_label}' (budget = 0). " + f"before={report.before.egress_packets_to_other_destinations}, " + f"after={report.after.egress_packets_to_other_destinations}" + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_SEC_02_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_SEC_02_FIXTURE_ENV_VAR}-unset>") + return root / NFT_SEC_02_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[eo.EgressCounterSnapshot, eo.EgressCounterSnapshot, str]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-SEC-02: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + window_label = str(payload.get("window_label", "5min-derkachi-replay")) + try: + before = _parse_snapshot(payload["before"], fixture_path, "before") + after = _parse_snapshot(payload["after"], fixture_path, "after") + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-02: fixture {fixture_path} snapshot shape invalid: {exc}" + ) + return before, after, window_label + + +def _parse_snapshot( + raw: object, fixture_path: Path, label: str +) -> eo.EgressCounterSnapshot: + if not isinstance(raw, dict): + pytest.fail( + f"NFT-SEC-02: fixture {fixture_path} '{label}' must be an object" + ) + return eo.EgressCounterSnapshot( + egress_packets_to_internal_net=int(raw["egress_packets_to_internal_net"]), + egress_packets_to_other_destinations=int( + raw["egress_packets_to_other_destinations"] + ), + udp53_egress_packets=int(raw.get("udp53_egress_packets", 0)), + ) diff --git a/e2e/tests/security/test_nft_sec_03_mavlink_signing.py b/e2e/tests/security/test_nft_sec_03_mavlink_signing.py new file mode 100644 index 0000000..0d09a80 --- /dev/null +++ b/e2e/tests/security/test_nft_sec_03_mavlink_signing.py @@ -0,0 +1,262 @@ +"""NFT-SEC-03 — AP rejects unsigned / wrong-key / replayed messages (AZ-438 / AC-NEW-11). + +AP-only. Three sub-cases (sent in order; the runner pauses between +each): + +* (a) unsigned ``GPS_INPUT``; +* (b) signed-with-wrong-key ``GPS_INPUT``; +* (c) replayed-from-tlog signed ``GPS_INPUT`` (counter-replay attack). + +For each: AP MUST emit ``BAD_SIGNATURE`` (or one of the documented +equivalent rejection STATUSTEXTs) within ≤500 ms; AP's +``GLOBAL_POSITION_INT`` must NOT update from the injected message +(``position_drift_m ≤ 1 m`` tolerance). + +iNav is N/A — MSP has no signing layer; the test skips when +``fc_adapter == 'inav'`` (AC-1). vio_strategy parameterization (AC-5) +runs the AP probe under each strategy because the conftest matrix +already enforces it; the SUT's VIO is irrelevant to the AP-side +rejection but the parameterization keeps evidence symmetric across the +test matrix. + +Production dependencies surfaced to AZ-595 / SUT: + +* fixture JSON shape (below) is sourced from a ``ap-only`` SITL replay + with the three injection timestamps + AP STATUSTEXT capture + AP + ``GLOBAL_POSITION_INT`` capture; +* AP build MUST have MAVLink 2.0 signing enabled (per FT-P-09-AP / + AZ-416 handshake); otherwise the rejection STATUSTEXT is never + emitted and every sub-case fails on AC-2 — a fail-safe outcome, + but the test will be noisy until the handshake fixture is wired. + +Fixture JSON shape:: + + { + "injections": [ + {"sub_case": "unsigned"|"wrong_key"|"replayed", + "injected_at_ms": }, ... + ], + "statustexts": [{"monotonic_ms": , "text": }, ...], + "positions": [{"monotonic_ms": , + "lat_e7": , "lon_e7": }, ...] + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import mavlink_signing_evaluator as mse + +NFT_SEC_03_FIXTURE_ENV_VAR = "E2E_NFT_SEC_03_FIXTURE" +NFT_SEC_03_DEFAULT_FIXTURE_NAME = "nft_sec_03_mavlink_signing.json" + + +@pytest.mark.scenario_id("nft-sec-03") +@pytest.mark.traces_to("AC-NEW-11,AC-1,AC-2,AC-3,AC-4,AC-5") +def test_nft_sec_03_mavlink_signing( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """AP rejects all three injection sub-cases within ≤500 ms; no position drift.""" + if fc_adapter == "inav": + pytest.skip( + "AC-1: NFT-SEC-03 is AP-only; iNav (MSP) has no signing layer." + ) + if not sitl_replay_ready: + pytest.skip( + "NFT-SEC-03 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying the three " + "injection timestamps + AP STATUSTEXT + GLOBAL_POSITION_INT " + "captures. Pure rejection-logic covered by " + "e2e/_unit_tests/helpers/test_mavlink_signing_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-SEC-03: fixture not found at {fixture_path}. " + f"`{NFT_SEC_03_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595 + FT-P-09-AP signing handshake " + "(AZ-416)." + ) + + payload = json.loads(fixture_path.read_text()) + injections, statustexts, positions = _parse_payload(payload, fixture_path) + if len(injections) != 3: + pytest.fail( + f"NFT-SEC-03 AC-2..AC-4: fixture must contain exactly 3 " + f"injections (unsigned + wrong_key + replayed); got " + f"{len(injections)} in {fixture_path}." + ) + sub_cases_seen = {inj.sub_case for inj in injections} + expected = {mse.SubCase.UNSIGNED, mse.SubCase.WRONG_KEY, mse.SubCase.REPLAYED} + if sub_cases_seen != expected: + pytest.fail( + f"NFT-SEC-03: fixture missing sub-cases {sorted(s.value for s in expected - sub_cases_seen)} " + f"in {fixture_path}." + ) + + report = mse.evaluate( + injections, statustexts=statustexts, positions=positions + ) + out_csv = ( + evidence_dir + / "nft-sec-03" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + mse.write_csv_evidence(out_csv, report) + + for sub in report.sub_cases: + if sub.rejection_latency_ms is not None: + nfr_recorder.record_metric( + f"nft_sec_03.{sub.sub_case.value}.rejection_latency_ms", + float(sub.rejection_latency_ms), + ac_id=_ac_for(sub.sub_case), + ) + nfr_recorder.record_metric( + f"nft_sec_03.{sub.sub_case.value}.position_drift_m", + sub.position_drift_m, + ac_id=_ac_for(sub.sub_case), + ) + + for sub in report.sub_cases: + ac = _ac_for(sub.sub_case) + assert sub.passes_rejection, ( + f"{ac}: AP did not reject {sub.sub_case.value} GPS_INPUT within " + f"{sub.budget_ms} ms — rejection_at_ms={sub.rejection_at_ms}, " + f"rejection_text={sub.rejection_text!r}, " + f"latency_ms={sub.rejection_latency_ms}." + ) + assert sub.passes_no_position_update, ( + f"{ac}: AP GLOBAL_POSITION_INT drifted " + f"{sub.position_drift_m:.2f} m around injection (tolerance " + f"{mse.POSITION_DRIFT_TOLERANCE_M} m) — the rejection STATUSTEXT " + f"fired but the position update was accepted. This is a " + f"defense-bypass bug (signaling-only rejection without state " + f"enforcement)." + ) + + +def _ac_for(sub_case: mse.SubCase) -> str: + return { + mse.SubCase.UNSIGNED: "AC-2", + mse.SubCase.WRONG_KEY: "AC-3", + mse.SubCase.REPLAYED: "AC-4", + }[sub_case] + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_SEC_03_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_SEC_03_FIXTURE_ENV_VAR}-unset>") + return root / NFT_SEC_03_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[ + list[mse.InjectionEvent], + list[mse.StatustextSample], + list[mse.PositionSample], +]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-SEC-03: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + raw_inj = payload.get("injections") + if not isinstance(raw_inj, list): + pytest.fail( + f"NFT-SEC-03: fixture {fixture_path} 'injections' must be a list" + ) + injections: list[mse.InjectionEvent] = [] + for idx, entry in enumerate(raw_inj): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-SEC-03: injections[{idx}] in {fixture_path} must be an object" + ) + try: + sub_case = mse.SubCase(str(entry["sub_case"])) + except (KeyError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-03: injections[{idx}] in {fixture_path} 'sub_case' " + f"must be one of {sorted(s.value for s in mse.SubCase)}; got {exc}" + ) + try: + injections.append( + mse.InjectionEvent( + sub_case=sub_case, + injected_at_ms=int(entry["injected_at_ms"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-03: injections[{idx}] in {fixture_path} shape invalid: {exc}" + ) + + raw_st = payload.get("statustexts", []) + if not isinstance(raw_st, list): + pytest.fail( + f"NFT-SEC-03: fixture {fixture_path} 'statustexts' must be a list" + ) + statustexts: list[mse.StatustextSample] = [] + for idx, entry in enumerate(raw_st): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-SEC-03: statustexts[{idx}] in {fixture_path} must be an object" + ) + try: + statustexts.append( + mse.StatustextSample( + monotonic_ms=int(entry["monotonic_ms"]), + text=str(entry["text"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-03: statustexts[{idx}] in {fixture_path} shape invalid: {exc}" + ) + + raw_pos = payload.get("positions", []) + if not isinstance(raw_pos, list): + pytest.fail( + f"NFT-SEC-03: fixture {fixture_path} 'positions' must be a list" + ) + positions: list[mse.PositionSample] = [] + for idx, entry in enumerate(raw_pos): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-SEC-03: positions[{idx}] in {fixture_path} must be an object" + ) + try: + positions.append( + mse.PositionSample( + monotonic_ms=int(entry["monotonic_ms"]), + lat_e7=int(entry["lat_e7"]), + lon_e7=int(entry["lon_e7"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-03: positions[{idx}] in {fixture_path} shape invalid: {exc}" + ) + + return injections, statustexts, positions diff --git a/e2e/tests/security/test_nft_sec_04_asan_fuzz.py b/e2e/tests/security/test_nft_sec_04_asan_fuzz.py new file mode 100644 index 0000000..12c1fd7 --- /dev/null +++ b/e2e/tests/security/test_nft_sec_04_asan_fuzz.py @@ -0,0 +1,183 @@ +"""NFT-SEC-04 ≥4 h ASan fuzz — release-gated (AZ-439 / RESTRICT-CVE-1 AC-2 + AC-3). + +Companion to ``test_nft_sec_04_opencv_cve.py`` (the always-run probe). +This scenario consumes the captured fuzz-run summary (ASan stderr log ++ duration + corpus size) and asserts: + +* AC-2: 0 ASan findings of any category; +* AC-3: ≥1000 unique JPEG corpus inputs (informational only — does NOT + contribute to ``passes`` so a fuzz with high finding count + low + corpus fails for the finding count, not the coverage proxy). + +Release-gated by ``E2E_NFT_SEC_04_RELEASE_GATE=1`` because the fuzz +run takes ≥4 h. fc_adapter parameterization is irrelevant for image +decode (AC-4): only the ``ardupilot`` parameterization actually executes; +the rest skip cleanly to avoid duplicating a 4 h run. + +Production dependencies surfaced: + +* **AZ-444 (Tier-2 harness)**: optional. The Tier-1 path can run a + shorter fuzz against the ASan SUT image on x86; Tier-2 runs the same + fuzz on Jetson with the same SUT image. +* **AZ-595**: emit ``nft_sec_04_asan_fuzz.json`` carrying the captured + ASan stderr log lines + duration + corpus size. + +Fixture JSON shape:: + + { + "duration_seconds": , + "corpus_size": , + "asan_log_lines": [, , ...] + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import asan_fuzz_evaluator as afe + +NFT_SEC_04_ASAN_FIXTURE_ENV_VAR = "E2E_NFT_SEC_04_ASAN_FIXTURE" +NFT_SEC_04_ASAN_DEFAULT_FIXTURE_NAME = "nft_sec_04_asan_fuzz.json" +NFT_SEC_04_RELEASE_GATE_ENV_VAR = "E2E_NFT_SEC_04_RELEASE_GATE" + + +@pytest.mark.scenario_id("nft-sec-04-asan-fuzz") +@pytest.mark.traces_to("RESTRICT-CVE-1,AC-2,AC-3,AC-4") +def test_nft_sec_04_asan_fuzz( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """0 ASan findings across ≥4 h JPEG-fuzz; corpus ≥1000 (informational).""" + if not _release_gate_enabled(): + pytest.skip( + "NFT-SEC-04 ASan-fuzz is release-gated (≥4 h run). Set " + f"`{NFT_SEC_04_RELEASE_GATE_ENV_VAR}=1` to execute. The " + "probe scenario (test_nft_sec_04_opencv_cve.py) covers " + "RESTRICT-CVE-1 AC-1 on every CI run." + ) + if fc_adapter != "ardupilot": + pytest.skip( + "AC-4: NFT-SEC-04 ASan-fuzz is fc_adapter-agnostic (image " + "decode is upstream of FC); only run once per vio_strategy " + "under fc_adapter=ardupilot to avoid duplicating a 4 h run." + ) + if not sitl_replay_ready: + pytest.skip( + "NFT-SEC-04 ASan-fuzz requires `E2E_SITL_REPLAY_DIR` to point " + "at a prepared SITL replay fixture (AZ-595) carrying the " + "captured fuzz-run summary. Pure ASan log classification + " + "verdict logic covered by " + "e2e/_unit_tests/helpers/test_asan_fuzz_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-SEC-04 ASan-fuzz: fixture not found at {fixture_path}. " + f"`{NFT_SEC_04_ASAN_FIXTURE_ENV_VAR}` env var must point at a " + "JSON file with the schema documented in the scenario " + "docstring. Production dependency: AZ-595 + (optional) AZ-444." + ) + + payload = json.loads(fixture_path.read_text()) + duration_s, corpus_size, log_lines = _parse_payload(payload, fixture_path) + report = afe.evaluate( + log_lines, + duration_seconds=duration_s, + corpus_size=corpus_size, + ) + out_csv = ( + evidence_dir + / "nft-sec-04" + / f"{fc_adapter}-{vio_strategy}-asan-fuzz.csv" + ) + afe.write_csv_evidence(out_csv, report) + + nfr_recorder.record_metric( + "nft_sec_04.asan_finding_count", + float(len(report.findings)), + ac_id="AC-2", + ) + nfr_recorder.record_metric( + "nft_sec_04.fuzz_duration_seconds", + report.duration_seconds, + ac_id="AC-2", + ) + nfr_recorder.record_metric( + "nft_sec_04.fuzz_corpus_size", + float(report.corpus_size), + ac_id="AC-3", + ) + + assert report.passes_duration, ( + f"AC-2 pre-condition: fuzz duration {report.duration_seconds:.0f} s " + f"is below the required ≥{afe.MIN_FUZZ_DURATION_SECONDS} s — " + "the 0-finding result is not statistically meaningful for the " + "RESTRICT-CVE-1 budget without the full window." + ) + assert report.passes_findings, ( + f"AC-2: {len(report.findings)} ASan finding(s) recorded — " + f"see `nft-sec-04/{fc_adapter}-{vio_strategy}-asan-fuzz.csv` " + f"for per-finding categories. Any finding is a release-blocker." + ) + # AC-3 is informational: emit a warning-style fail-fast message via + # the evidence CSV (already written above) but do NOT fail the test. + # The user is expected to inspect the corpus floor manually. + + +def _release_gate_enabled() -> bool: + return os.environ.get(NFT_SEC_04_RELEASE_GATE_ENV_VAR, "").strip().lower() in ( + "1", + "true", + "yes", + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_SEC_04_ASAN_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_SEC_04_ASAN_FIXTURE_ENV_VAR}-unset>") + return root / NFT_SEC_04_ASAN_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[float, int, list[str]]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-SEC-04 ASan-fuzz: fixture {fixture_path} must be a JSON " + f"object; got top-level type={type(payload).__name__}" + ) + try: + duration_s = float(payload["duration_seconds"]) + corpus_size = int(payload["corpus_size"]) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-04 ASan-fuzz: fixture {fixture_path} missing/invalid " + f"duration_seconds or corpus_size: {exc}" + ) + raw_lines = payload.get("asan_log_lines", []) + if not isinstance(raw_lines, list): + pytest.fail( + f"NFT-SEC-04 ASan-fuzz: fixture {fixture_path} " + f"'asan_log_lines' must be a list (may be empty)" + ) + log_lines = [str(line) for line in raw_lines] + return duration_s, corpus_size, log_lines diff --git a/e2e/tests/security/test_nft_sec_04_opencv_cve.py b/e2e/tests/security/test_nft_sec_04_opencv_cve.py new file mode 100644 index 0000000..99f65d0 --- /dev/null +++ b/e2e/tests/security/test_nft_sec_04_opencv_cve.py @@ -0,0 +1,173 @@ +"""NFT-SEC-04 probe — OpenCV CVE-2025-53644 no-crash (AZ-439 / RESTRICT-CVE-1). + +Always-runs (Tier-1 OR Tier-2). The crafted ``cve-2025-53644.jpg`` is +fed to the SUT's nav-camera as a single frame and the FDR archive is +inspected: + +* AC-1a: at least one FDR record exists strictly after the probe + injection (proves the SUT process did not crash); +* AC-1b: the FDR record matched within ``±tolerance_ms`` of the probe + is one of ``decode-success`` or ``frame-decode-error`` (proves the + SUT either decoded the patched JPEG or gracefully rejected it). + +The companion ≥4 h ASan fuzz lives in +``test_nft_sec_04_asan_fuzz.py`` and is release-gated. + +Production dependencies surfaced: + +* **AZ-595**: emit ``nft_sec_04_cve_probe.json`` carrying + ``probe_injected_at_ms`` + the per-frame FDR record sequence the + runner captured; +* **SUT**: the SUT MUST honor its FDR per-frame outcome contract — a + silent drop is treated as a defense-bypass failure even when the + process does not crash. + +Fixture JSON shape:: + + { + "probe_injected_at_ms": , + "tolerance_ms": , + "fdr_records": [ + {"monotonic_ms": , "kind": }, ... + ] + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import cve_probe_evaluator as cpe + +NFT_SEC_04_FIXTURE_ENV_VAR = "E2E_NFT_SEC_04_FIXTURE" +NFT_SEC_04_DEFAULT_FIXTURE_NAME = "nft_sec_04_cve_probe.json" + + +@pytest.mark.scenario_id("nft-sec-04") +@pytest.mark.traces_to("RESTRICT-CVE-1,AC-1,AC-4") +def test_nft_sec_04_opencv_cve_probe( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """SUT survives the crafted JPEG and records a deterministic outcome.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-SEC-04 probe requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying the post-probe " + "FDR record sequence. Pure no-crash / outcome-classification " + "logic covered by " + "e2e/_unit_tests/helpers/test_cve_probe_evaluator.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-SEC-04 probe: fixture not found at {fixture_path}. " + f"`{NFT_SEC_04_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + probe_at_ms, tolerance_ms, fdr_records = _parse_payload(payload, fixture_path) + report = cpe.evaluate( + fdr_records, + probe_injected_at_ms=probe_at_ms, + tolerance_ms=tolerance_ms, + ) + out_csv = ( + evidence_dir + / "nft-sec-04" + / f"{fc_adapter}-{vio_strategy}-probe.csv" + ) + cpe.write_csv_evidence(out_csv, report) + + nfr_recorder.record_metric( + "nft_sec_04.probe_outcome_is_decode_success", + 1.0 if report.probe_outcome is cpe.ProbeFrameOutcome.DECODE_SUCCESS else 0.0, + ac_id="AC-1", + ) + nfr_recorder.record_metric( + "nft_sec_04.probe_outcome_is_graceful_error", + 1.0 if report.probe_outcome is cpe.ProbeFrameOutcome.FRAME_DECODE_ERROR else 0.0, + ac_id="AC-1", + ) + + assert report.passes_no_crash, ( + f"AC-1a: SUT did not produce any FDR record after probe injection " + f"at {report.probe_injected_at_ms} ms — process likely crashed. " + f"last_fdr_record_at_ms={report.last_fdr_record_at_ms}." + ) + assert report.passes_graceful_outcome, ( + f"AC-1b: SUT silently dropped the probe frame (no decode-success " + f"or frame-decode-error in FDR within ±{tolerance_ms} ms of " + f"probe injection at {report.probe_injected_at_ms} ms). Silent " + f"drops are a defense-bypass failure even if the process did not " + f"crash." + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_SEC_04_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_SEC_04_FIXTURE_ENV_VAR}-unset>") + return root / NFT_SEC_04_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[int, int, list[cpe.FdrSurvivalRecord]]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-SEC-04 probe: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + try: + probe_at = int(payload["probe_injected_at_ms"]) + tolerance = int(payload.get("tolerance_ms", 50)) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-04 probe: fixture {fixture_path} missing/invalid " + f"probe_injected_at_ms or tolerance_ms: {exc}" + ) + raw_records = payload.get("fdr_records") + if not isinstance(raw_records, list): + pytest.fail( + f"NFT-SEC-04 probe: fixture {fixture_path} 'fdr_records' must be a list" + ) + records: list[cpe.FdrSurvivalRecord] = [] + for idx, entry in enumerate(raw_records): + if not isinstance(entry, dict): + pytest.fail( + f"NFT-SEC-04 probe: fdr_records[{idx}] in {fixture_path} " + f"must be an object" + ) + try: + records.append( + cpe.FdrSurvivalRecord( + monotonic_ms=int(entry["monotonic_ms"]), + kind=str(entry["kind"]), + ) + ) + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-04 probe: fdr_records[{idx}] in {fixture_path} " + f"shape invalid: {exc}" + ) + return probe_at, tolerance, records diff --git a/e2e/tests/security/test_nft_sec_05_dns_blackhole.py b/e2e/tests/security/test_nft_sec_05_dns_blackhole.py new file mode 100644 index 0000000..a4d5277 --- /dev/null +++ b/e2e/tests/security/test_nft_sec_05_dns_blackhole.py @@ -0,0 +1,170 @@ +"""NFT-SEC-05 — DNS-blackhole defense-in-depth (AZ-437 / AC-NEW-10, residual-risk #1). + +Tier-1 OR Tier-2. Even if ``e2e-net.internal: true`` is misconfigured, +the DNS-blackhole sidecar MUST prevent DNS-based exfiltration. The +runner executes a ``nslookup`` inside the SUT container's network +namespace and asserts: + +* AC-2: the sidecar's health endpoint returns healthy; +* AC-3a: the lookup *fails* (NXDOMAIN, timeout, "no servers can be + reached", or any other documented failure outcome); +* AC-3b: no UDP-53 packets cross the host's outbound interface during + the probe. + +The combined verdict object is sourced from the SITL replay fixture +(AZ-595) for the same reason NFT-SEC-02 is fixture-sourced: the live +``docker exec`` + host-interface-counter pipeline only exists inside the +harness. + +Production dependency surfaced to AZ-595: fixture JSON shape + + { + "sidecar_healthy": , + "lookup_outcome": "nxdomain" | "timeout" | "no_servers_can_be_reached" + | "other_failure" | "success", + "before": {... egress snapshot ...}, + "after": {... egress snapshot ...} + } +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from runner.helpers import egress_observer as eo + +NFT_SEC_05_FIXTURE_ENV_VAR = "E2E_NFT_SEC_05_FIXTURE" +NFT_SEC_05_DEFAULT_FIXTURE_NAME = "nft_sec_05_dns_blackhole.json" + + +@pytest.mark.scenario_id("nft-sec-05") +@pytest.mark.traces_to("AC-NEW-10,AC-2,AC-3,AC-4") +def test_nft_sec_05_dns_blackhole( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + sitl_replay_ready: bool, +) -> None: + """Sidecar healthy + lookup fails + UDP-53 silent.""" + if not sitl_replay_ready: + pytest.skip( + "NFT-SEC-05 requires `E2E_SITL_REPLAY_DIR` to point at a " + "prepared SITL replay fixture (AZ-595) carrying the DNS-probe " + "outcome + UDP-53 counter snapshots. Pure verdict logic " + "covered by e2e/_unit_tests/helpers/test_egress_observer.py." + ) + + fixture_path = _resolve_fixture_path() + if not fixture_path.is_file(): + pytest.fail( + f"NFT-SEC-05: fixture not found at {fixture_path}. " + f"`{NFT_SEC_05_FIXTURE_ENV_VAR}` env var must point at a JSON " + "file with the schema documented in the scenario docstring. " + "Production dependency: AZ-595." + ) + + payload = json.loads(fixture_path.read_text()) + before, after, lookup_outcome, sidecar_healthy = _parse_payload( + payload, fixture_path + ) + report = eo.evaluate_dns_blackhole( + before, + after, + lookup_outcome=lookup_outcome, + sidecar_healthy=sidecar_healthy, + ) + out_csv = ( + evidence_dir + / "nft-sec-05" + / f"{fc_adapter}-{vio_strategy}.csv" + ) + eo.write_dns_blackhole_csv_evidence(out_csv, report) + + nfr_recorder.record_metric( + "nft_sec_05.udp53_egress_delta", + float(report.delta_udp53), + ac_id="AC-3", + ) + + assert report.sidecar_healthy, ( + "AC-2: DNS blackhole sidecar reported unhealthy — defense-in-depth " + "is unavailable; SUT egress isolation is the only layer protecting " + "data residency." + ) + assert report.passes_lookup, ( + f"AC-3a: nslookup outcome = {report.lookup_outcome.value} — DNS " + "resolution must FAIL inside the SUT container (NXDOMAIN, timeout, " + "no-servers, or other-failure). SUCCESS means an exfiltration " + "path exists." + ) + assert report.passes_udp_silence, ( + f"AC-3b: UDP-53 egress delta = {report.delta_udp53} packets " + "during probe (budget = 0). Even a single packet leaving the " + "host means the DNS-blackhole sidecar failed to absorb the probe." + ) + + +def _resolve_fixture_path() -> Path: + raw = os.environ.get(NFT_SEC_05_FIXTURE_ENV_VAR, "").strip() + from runner.helpers import sitl_observer + + root = sitl_observer.replay_dir() + if not raw: + if root is None: + return Path(f"<{NFT_SEC_05_FIXTURE_ENV_VAR}-unset>") + return root / NFT_SEC_05_DEFAULT_FIXTURE_NAME + path = Path(raw) + if not path.is_absolute() and root is not None: + path = root / path + return path + + +def _parse_payload( + payload: object, fixture_path: Path +) -> tuple[eo.EgressCounterSnapshot, eo.EgressCounterSnapshot, eo.DnsLookupOutcome, bool]: + if not isinstance(payload, dict): + pytest.fail( + f"NFT-SEC-05: fixture {fixture_path} must be a JSON object; " + f"got top-level type={type(payload).__name__}" + ) + try: + sidecar_healthy = bool(payload["sidecar_healthy"]) + outcome_raw = str(payload["lookup_outcome"]) + try: + lookup_outcome = eo.DnsLookupOutcome(outcome_raw) + except ValueError as exc: + pytest.fail( + f"NFT-SEC-05: fixture {fixture_path} 'lookup_outcome' must " + f"be one of " + f"{sorted(o.value for o in eo.DnsLookupOutcome)}; got " + f"{outcome_raw!r} ({exc})" + ) + before = _parse_snapshot(payload["before"], fixture_path, "before") + after = _parse_snapshot(payload["after"], fixture_path, "after") + except (KeyError, TypeError, ValueError) as exc: + pytest.fail( + f"NFT-SEC-05: fixture {fixture_path} shape invalid: {exc}" + ) + return before, after, lookup_outcome, sidecar_healthy + + +def _parse_snapshot( + raw: object, fixture_path: Path, label: str +) -> eo.EgressCounterSnapshot: + if not isinstance(raw, dict): + pytest.fail( + f"NFT-SEC-05: fixture {fixture_path} '{label}' must be an object" + ) + return eo.EgressCounterSnapshot( + egress_packets_to_internal_net=int(raw["egress_packets_to_internal_net"]), + egress_packets_to_other_destinations=int( + raw["egress_packets_to_other_destinations"] + ), + udp53_egress_packets=int(raw["udp53_egress_packets"]), + )