diff --git a/_docs/02_tasks/todo/AZ-424_ft_n_01_outlier_tolerance.md b/_docs/02_tasks/done/AZ-424_ft_n_01_outlier_tolerance.md similarity index 100% rename from _docs/02_tasks/todo/AZ-424_ft_n_01_outlier_tolerance.md rename to _docs/02_tasks/done/AZ-424_ft_n_01_outlier_tolerance.md diff --git a/_docs/02_tasks/todo/AZ-425_ft_n_03_outage_reloc.md b/_docs/02_tasks/done/AZ-425_ft_n_03_outage_reloc.md similarity index 100% rename from _docs/02_tasks/todo/AZ-425_ft_n_03_outage_reloc.md rename to _docs/02_tasks/done/AZ-425_ft_n_03_outage_reloc.md diff --git a/_docs/02_tasks/todo/AZ-426_ft_n_04_blackout_spoof.md b/_docs/02_tasks/done/AZ-426_ft_n_04_blackout_spoof.md similarity index 100% rename from _docs/02_tasks/todo/AZ-426_ft_n_04_blackout_spoof.md rename to _docs/02_tasks/done/AZ-426_ft_n_04_blackout_spoof.md diff --git a/_docs/03_implementation/batch_73_report.md b/_docs/03_implementation/batch_73_report.md new file mode 100644 index 0000000..17d7ce8 --- /dev/null +++ b/_docs/03_implementation/batch_73_report.md @@ -0,0 +1,141 @@ +# Batch 73 Report — Test Implementation (cycle 1, batch 7 of test phase) + +**Batch**: 73 +**Date**: 2026-05-17 +**Context**: Test implementation (greenfield Step 10 — Implement Tests) +**Tasks**: AZ-424 (3pt), AZ-425 (3pt), AZ-426 (5pt) — 11 cp / 3 tasks +**Cycle**: 1 +**Verdict**: COMPLETE — PASS (self-reviewed; see `reviews/batch_73_review.md`) + +## Summary + +The negatives set — FT-N-01 / FT-N-03 / FT-N-04 — the project's +failure-mode robustness suite (AC-3.1, AC-3.4, AC-3.5, AC-NEW-8). +Same pattern as the prior batches in this phase: + +* Pure-logic evaluator under `e2e/runner/helpers/` (everything the + scenario can express without docker-bound SITL access). +* Scenario file under `e2e/tests/negative/`, parameterised across + conftest fixtures, skip-gated on upstream replay / FDR / mavproxy + / SITL observer helpers (auto-activates when AZ-441 + AZ-407 + + AZ-416 leftovers land). +* Helper-driven unit test file under `e2e/_unit_tests/helpers/`. + +### AZ-424 — FT-N-01 350 m outlier injection tolerance (3pt) + +* **`runner/helpers/outlier_tolerance_evaluator.py`** — three + invariants: + - AC-1: count gate — `MIN_OUTLIER_COUNT = 10` outliers across the + Derkachi 8-min `--density medium` replay (the AC-3.1 envelope). + - AC-2: per-event drift bound — `error_after_outlier − + error_before_outlier ≤ DRIFT_BUDGET_M = 50.0`. `before` / `after` + are the immediate neighbour frames in the outbound stream; + `distance_m` is the shared Vincenty helper. + - AC-3: covariance monotonic across the 3-frame window centred on + the outlier (`COVARIANCE_WINDOW_FRAMES = 3`). + - Plus `load_outlier_manifest` (reads the AZ-408 injector's + `manifest.csv`) and `write_csv_evidence`. +* **`tests/negative/test_ft_n_01_outlier_tolerance.py`** — scenario + indirect-parametrises `outlier_injection_derkachi` at + `density="medium", seed=0`, drives replay, collects FDR + `outbound_estimate` records, joins them to per-frame GT, evaluates, + asserts per-event `passes_drift` + `passes_covariance` plus the + aggregate `passes_count`. Records NFR metrics + `ft_n_01.total_outliers`, `ft_n_01.failed_event_count`, per-event + `drift_m` + `cov_non_decreasing`. +* **14 unit tests** in `test_outlier_tolerance_evaluator.py`. + +### AZ-425 — FT-N-03 Extended outage triggers operator re-loc request (3pt) + +* **`runner/helpers/outage_request_evaluator.py`** — first detects + outage windows from frame-index gaps (≥`MIN_OUTAGE_FRAMES = 3` + consecutive missing frames), then per-window evaluates: + - AC-2: STATUSTEXT `OPERATOR_RELOC_REQUEST` observed at + `[OUTAGE_THRESHOLD_S − TOLERANCE_S, OUTAGE_THRESHOLD_S + + TOLERANCE_S] = [1.5, 2.5] s` after outage onset. + - AC-3: at least one `source_label = dead_reckoned` outbound + emission inside the window. + - AC-4: zero FC-side EKF divergence events inside the window + (observable via SITL state read). + - Plus `detect_outage_windows` (with explicit handling for trailing + windows + multi-window flights) and `write_csv_evidence`. +* **`tests/negative/test_ft_n_03_outage_reloc.py`** — scenario drives + replay with a 3-frame outage injector (a future thin extension of + the AZ-408 outlier injector), reads FDR `frame_received` + + `outbound_estimate` records to reconstruct + `expected_frame_indices` and the estimate stream, walks the + mavproxy `.tlog` for STATUSTEXT, and pulls EKF divergence events + via `sitl_observer.read_ekf_divergence_events()`. Records per-window + NFR metrics with AC IDs (`length_frames`, `statustext_offset_ms`, + `dead_reckoned_count`, `ekf_divergence_count`). +* **18 unit tests** in `test_outage_request_evaluator.py`. + +### AZ-426 — FT-N-04 Visual blackout + spoofed GPS combined failsafe (5pt) + +* **`runner/helpers/blackout_spoof_evaluator.py`** — the most ladder- + heavy evaluator in the project: eight per-AC sub-reports stitched + into one `BlackoutSpoofReport`. Constants pulled into the module + header so the spec can be diffed against code in one place: + `SWITCH_LATENCY_MS = 400` (AC-1), + `HONEST_ACCURACY_RATIO = 0.95` (AC-4), + `STATUSTEXT_RATE_MIN_HZ = 1.0` / `STATUSTEXT_RATE_MAX_HZ = 2.0` (AC-5), + `ESCALATION_COV_2D_M = 100.0` (AC-6), + `ESCALATION_COV_FAILSAFE_M = 500.0`, `ESCALATION_DURATION_FAILSAFE_S = 30.0`, + `ESCALATION_LATENCY_MS = 500` (AC-7), + `RECOVERY_STABLE_S = 10.0` (AC-8). + Per-AC analysers: + - `evaluate_switch_latency`: budget = `min(SWITCH_LATENCY_MS, + frame_period_ms)` — the spec's "≤1 frame OR ≤400 ms (whichever is + shorter)" wording, made explicit. + - `evaluate_spoof_rejection`: requires both ≥1 FDR + `spoof-rejected` event AND zero `satellite_anchored` emissions + inside the window (so the SUT cannot silently re-promote on a + spoofed lock). + - `evaluate_covariance_monotonic`: first non-decreasing violation + timestamp + binary pass. + - `evaluate_honest_accuracy`: per-sample `horiz_accuracy ≥ 0.95 × + cov_semi_major_m`. Boundary test pins the spec budget. + - `evaluate_statustext_rate`: `VISUAL_BLACKOUT_IMU_ONLY` rate over + the window must land in [1, 2] Hz. + - `evaluate_escalation` (35 s window only): AC-6 fix_type degrades + on the first cov-100 m crossing; AC-7 triggers on the earliest + of cov-500 m crossing OR 30 s duration. Non-35 s windows pass + vacuously — they aren't expected to hit either threshold. + - `evaluate_recovery_gate`: AC-8 — ≥10 s of healthy + non-spoofed + FC GPS + a consistency-check pass before re-promoting to + `satellite_anchored` post-window. +* **`tests/negative/test_ft_n_04_blackout_spoof.py`** — scenario + indirect-parametrises `blackout_spoof_derkachi` over + `_WINDOW_LADDER_S = (5.0, 15.0, 35.0)` with ids `["5s", "15s", + "35s"]`. Collects FDR `outbound_estimate` + `spoof_rejected`, + mavproxy STATUSTEXT, and SITL GPS-health + consistency-check + samples. Asserts each AC with a descriptive failure message that + surfaces the relevant sub-report fields. +* **29 unit tests** in `test_blackout_spoof_evaluator.py`. + +## Layout invariant + +`e2e/_unit_tests/test_directory_layout.py` now lists the three new +evaluators and the three new scenario files. + +## Test Results + +* New unit tests: 14 + 18 + 29 = **61**. +* Plus 6 new entries in `test_required_path_exists` parametrize + (3 helpers + 3 scenarios). +* Full `e2e/_unit_tests` suite: **527 passed in 130 s** (previous + cumulative: 460 → +67 net). +* Scenario collection across the three negatives: 48 items + parametrized; the session-end `/e2e-results/evidence/per-nfr` + teardown error is the same pre-existing `nfr_recorder` wart + documented in batches 69-72 — not a regression of this batch and + not blocking unit-suite collection. + +## State + +* Specs moved: `_docs/02_tasks/todo/AZ-{424,425,426}_*.md` → + `_docs/02_tasks/done/`. +* `_docs/_autodev_state.md` advanced to + `last_completed_batch: 73`. +* Cumulative review window: `last_cumulative_review = batches_70-72`; + the next K=3 cumulative review fires at the end of batch 75. diff --git a/_docs/03_implementation/reviews/batch_73_review.md b/_docs/03_implementation/reviews/batch_73_review.md new file mode 100644 index 0000000..e081f3a --- /dev/null +++ b/_docs/03_implementation/reviews/batch_73_review.md @@ -0,0 +1,173 @@ +# Code Review Report + +**Batch**: 73 — AZ-424, AZ-425, AZ-426 +**Date**: 2026-05-17 +**Verdict**: PASS + +## Findings + +(none) + +## Findings Sweep + +### Phase 1 — Context Loading + +Loaded specs `AZ-424_ft_n_01_outlier_tolerance.md`, +`AZ-425_ft_n_03_outage_reloc.md`, `AZ-426_ft_n_04_blackout_spoof.md`. +Re-read injector surfaces touched by the new evaluators: +`e2e/fixtures/injectors/outlier.py` (manifest.csv schema + +`OutlierInjectionReport.out_root`), `e2e/fixtures/injectors/blackout_spoof.py` +(`BlackoutSpoofPlan`, `BlackoutSpoofSchedule.window_start_ms / window_end_ms`, +spoofed-GPS cadence + AC-NEW-8 200-500 m delta bounds). Re-read existing +fixture wiring in `e2e/runner/helpers/injector_fixtures.py` to confirm +`outlier_injection_derkachi` and `blackout_spoof_derkachi` parametrize +on `density` / `window_seconds`. Re-read the scenario template used in +batch 71/72 (`tests/positive/test_ft_p_10_smoothing_lookback.py`, +`tests/negative/test_ft_n_02_sharp_turn_failure.py`) for the +`_harness_helpers_implemented` gate pattern and the FDR / mavproxy / +sitl_observer access conventions. + +### Phase 2 — Spec Compliance + +**AZ-424 (FT-N-01)** + +| AC | Coverage | Status | +|----|----------|--------| +| AC-1 (medium-density injection; ≥10 outliers) | `test_constants_match_spec`, `test_evaluate_count_below_minimum_fails`, `test_evaluate_count_at_minimum_passes_count_gate`, scenario assertion via `MIN_OUTLIER_COUNT` | Covered | +| AC-2 (drift bound ≤50 m per outlier) | `test_evaluate_event_drift_within_budget`, `test_evaluate_event_drift_exceeds_budget_fails`, `test_evaluate_event_missing_neighbour_drift_none`, scenario per-event assertion via `OutlierEventReport.passes_drift` | Covered | +| AC-3 (covariance monotonic across 3-frame window) | `test_evaluate_event_cov_monotonic_passes`, `test_evaluate_event_cov_decreasing_fails`, `test_evaluate_event_cov_flat_window_passes`, scenario assertion via `passes_covariance` | Covered | +| AC-4 (parameterization per fc_adapter × vio_strategy) | scenario uses conftest `fc_adapter`/`vio_strategy` fixtures + indirect `outlier_injection_derkachi` (density=medium, seed=0) | Covered | +| CSV evidence | `test_write_csv_evidence_round_trips`, scenario writes `ft-n-01-{fc_adapter}-{vio_strategy}.csv` | Covered | + +**AZ-425 (FT-N-03)** + +| AC | Coverage | Status | +|----|----------|--------| +| AC-1 (≥3 consecutive missing frames) | `test_detect_no_outage_returns_empty`, `test_detect_run_below_min_length_ignored`, `test_detect_single_outage_window`, `test_detect_multiple_windows`, `test_detect_trailing_outage_window`, scenario assertion via `passes_min_length` | Covered | +| AC-2 (STATUSTEXT `OPERATOR_RELOC_REQUEST` within 2 s ±500 ms of onset) | `test_statustext_within_tolerance_passes`, `test_statustext_within_tolerance_late_passes`, `test_statustext_too_early_fails`, `test_statustext_too_late_fails`, `test_statustext_missing_fails`, `test_statustext_payload_mismatch_fails`, scenario assertion via `passes_statustext` | Covered | +| AC-3 (dead_reckoned label during outage) | `test_dead_reckoned_during_window_passes`, `test_dead_reckoned_absent_fails`, scenario assertion via `passes_dead_reckoned` | Covered | +| AC-4 (no FC EKF divergence event during outage) | `test_ekf_divergence_during_window_fails`, `test_ekf_divergence_outside_window_ignored`, scenario assertion via `passes_ekf` | Covered | +| AC-5 (parameterization) | scenario uses conftest `fc_adapter`/`vio_strategy` fixtures | Covered | +| CSV evidence | `test_write_csv_evidence_round_trips`, scenario writes `ft-n-03-{fc_adapter}-{vio_strategy}.csv` | Covered | + +**AZ-426 (FT-N-04)** + +| AC | Coverage | Status | +|----|----------|--------| +| AC-1 (switch latency ≤1 frame OR ≤400 ms) | `test_switch_latency_within_400_ms_passes` (validates `min(400, frame_period_ms)` budget), `test_switch_latency_within_one_frame_passes`, `test_switch_latency_at_one_frame_boundary_passes`, `test_switch_latency_missing_dead_reckoned_fails`, scenario assertion via `switch_latency.passes` | Covered | +| AC-2 (spoof-rejected events AND no satellite re-anchor inside window) | `test_spoof_rejection_pass`, `test_spoof_rejection_no_events_fails`, `test_spoof_rejection_label_returns_to_satellite_fails`, scenario assertion via `spoof_rejection.passes` | Covered | +| AC-3 (covariance monotonic) | `test_covariance_monotonic_pass`, `test_covariance_monotonic_decreasing_fails`, scenario assertion via `covariance_monotonic.passes` | Covered | +| AC-4 (`horiz_accuracy ≥ 0.95 × cov_semi_major_m`) | `test_honest_accuracy_pass`, `test_honest_accuracy_boundary_pass`, `test_honest_accuracy_violation_fails`, scenario assertion via `honest_accuracy.passes` | Covered | +| AC-5 (`VISUAL_BLACKOUT_IMU_ONLY` rate ∈ [1, 2] Hz) | `test_statustext_rate_pass_at_1hz`, `test_statustext_rate_pass_at_2hz`, `test_statustext_rate_too_slow_fails`, `test_statustext_rate_too_fast_fails`, scenario assertion via `statustext_rate.passes` | Covered | +| AC-6 (35 s only: cov 100 m → fix_type ≤2D) | `test_escalation_non_35s_window_passes_vacuously`, `test_escalation_35s_ac6_fix_type_degraded_passes`, `test_escalation_35s_ac6_fix_type_not_degraded_fails`, scenario assertion gated on `is_35s` via `escalation.passes_ac6` | Covered | +| AC-7 (35 s only: cov 500 m OR 30 s duration → `horiz=999`, `VISUAL_BLACKOUT_FAILSAFE` within 500 ms) | `test_escalation_35s_no_crossings_passes` (vacuous on duration-only path), `test_escalation_35s_ac7_horiz_not_999_fails`, scenario assertion gated on `is_35s` via `escalation.passes_ac7` | Covered | +| AC-8 (recovery gate: ≥10 s stable + consistency check pass) | `test_recovery_gate_pass`, `test_recovery_gate_unstable_fails`, `test_recovery_gate_spoofed_fails`, `test_recovery_gate_no_consistency_check_fails`, `test_recovery_gate_no_recovery_attempt_vacuous_pass`, scenario assertion via `recovery_gate.passes` | Covered | +| AC-9 (parameterization × 3 windows) | scenario indirect-parametrizes `blackout_spoof_derkachi` over `_WINDOW_LADDER_S = (5.0, 15.0, 35.0)` with ids `["5s", "15s", "35s"]`; conftest `fc_adapter`/`vio_strategy` adds 6 variants = 18 collected items per fc_adapter pair | Covered | +| CSV evidence | `test_write_csv_evidence_round_trips`, scenario writes `ft-n-04-{window_s}s-{fc_adapter}-{vio_strategy}.csv` | Covered | + +### Phase 3 — Code Quality + +* **Single responsibility**: each evaluator is one module with one + responsibility — `outlier_tolerance_evaluator` aggregates per-event + AC-2/AC-3 reports; `outage_request_evaluator` detects outage windows + and evaluates AC-1..AC-4 per window; `blackout_spoof_evaluator` + evaluates the AC-1..AC-8 ladder against one `BlackoutWindow`. None + of the three pulls in scenario-specific helpers (drive replay / + collect samples) — those live in the scenario test files. +* **Method naming**: per-AC evaluators are named after the AC concern + (`evaluate_switch_latency`, `evaluate_spoof_rejection`, + `evaluate_covariance_monotonic`, `evaluate_honest_accuracy`, + `evaluate_statustext_rate`, `evaluate_escalation`, + `evaluate_recovery_gate`). The aggregate `evaluate(...)` in each + module composes the per-AC reports into a single dataclass. +* **No suppressed errors**: `load_outlier_manifest` raises on missing + file and missing columns; the manifest writer raises naturally on + ENOENT; the evaluator helpers raise no exceptions of their own. + No bare `except`, no `2>/dev/null`-equivalents. +* **AAA comment discipline**: every test uses `# Arrange / # Act / + # Assert`; sections are omitted when not needed (e.g. constant + invariant tests just have `# Assert`). +* **Public boundary**: confirmed all three evaluators import only from + the `e2e.runner.helpers.geo` symbol (when needed) and dataclasses / + stdlib. No `from gps_denied_onboard ...`. Confirmed via grep. + +### Phase 4 — Security + +* **No new secrets, credentials, or network paths**. All three + evaluators are pure-logic over already-collected samples / events. +* **Spoof rejection (AC-2)** is the project's primary anti-spoof + invariant; the evaluator does not bypass it — it asserts the FDR + recorded the rejection AND that the source-label state machine did + not silently re-promote to `satellite_anchored` inside the window. +* **Honest accuracy (AC-4)** ensures the SUT cannot under-report + uncertainty to the FC. The evaluator's check is `horiz_accuracy ≥ + 0.95 × cov_semi_major_m` per the spec; we explicitly cover the + boundary in `test_honest_accuracy_boundary_pass` so a future + implementation cannot pass by emitting `horiz = cov` while the spec + budget is `0.95 × cov`. + +### Phase 5 — Performance + +All three evaluators are O(N) over their input sequences (single +pass over estimates, single pass over events, single pass over +statustexts). No nested scans beyond the bounded 3-frame window in +`outlier_tolerance_evaluator.evaluate_event`. CSV writes use +buffered `csv.writer`. No file I/O at module import time. + +### Phase 6 — Cross-Task Consistency + +* **Shared `geo.distance_m`** is the single point-to-point distance + helper used by `outlier_tolerance_evaluator`. Matches the + `accuracy_evaluator`, `multi_segment_evaluator`, + `smoothing_evaluator`, `cold_start_evaluator` conventions. +* **Shared `_harness_helpers_implemented` skip gate**: all three new + scenarios use the same probe pattern as `test_ft_p_10_*`, + `test_ft_p_11_*`, `test_ft_n_02_*` — `NotImplementedError` on + `frame_source_replay`, `fdr_reader`, `imu_replay`, + `mavproxy_tlog_reader`, `sitl_observer` collapses to a single + `pytest.skip(...)` with a pointer to the relevant unit test. +* **Constants centralised inside each module**: `MIN_OUTLIER_COUNT`, + `DRIFT_BUDGET_M`, `SWITCH_LATENCY_MS`, `STATUSTEXT_RATE_*_HZ`, + `ESCALATION_*` all sit at the top of their respective modules and + are imported as named constants in the unit tests. No magic numbers + inline. +* **Source-label vocabulary**: `dead_reckoned` / `satellite_anchored` + are spelled identically across the three new evaluators and match + the prior batches (`sharp_turn_detector.ALLOWED_DURING_TURN_LABELS`, + `multi_segment_evaluator`, FDR schema in batch 67-68). +* **STATUSTEXT regex strings**: `OPERATOR_RELOC_REQUEST` (FT-N-03), + `VISUAL_BLACKOUT_IMU_ONLY` (FT-N-04 AC-5), + `VISUAL_BLACKOUT_FAILSAFE` (FT-N-04 AC-7) match the spec verbatim; + unit-tested for substring presence + payload mismatch. + +### Phase 7 — Architecture Compliance + +* **Module placement**: all three evaluators live in + `e2e/runner/helpers/`; their unit tests in + `e2e/_unit_tests/helpers/`; their scenarios in + `e2e/tests/negative/`. Consistent with the AZ-406 layout and the + directory-layout invariant test (which now lists the three new + helpers + three new scenarios). +* **No `src/gps_denied_onboard` imports** anywhere in the new code. + Verified by inspection — the evaluators only consume typed + dataclasses populated by the scenario from public-boundary + sources (FDR, mavproxy tlog, SITL state, injector manifests). +* **Scenario gating**: each new scenario file uses + `pytest.skip(...)` with an explicit message pointing to the unit + test that covers the gated AC logic. This is the established + pattern from FT-P-07/08/09/10/11 and FT-N-02 — scenario coverage + comes online once the AZ-441 / AZ-407 / AZ-416 leftovers ship. + +## Test Results + +* New unit tests: 14 (outlier) + 18 (outage) + 29 (blackout-spoof) = **61 new tests** +* Plus 6 new entries in the parametrized `test_required_path_exists` + (3 evaluator paths + 3 scenario paths) — counted toward the suite + total. +* Full `e2e/_unit_tests` suite: **527 passed in 130 s** (previous + cumulative: 460 → +67 net). +* Scenario collection for the three negative tests: 48 items collect + cleanly (parametrized across `fc_adapter × vio_strategy × {density | + window_seconds}`). The session-end `/e2e-results/evidence/per-nfr` + teardown error is the same pre-existing wart documented in batches + 69-72 (nfr_recorder hardcoded path; not introduced by this batch). diff --git a/_docs/_autodev_state.md b/_docs/_autodev_state.md index 4cce34e..76ad66f 100644 --- a/_docs/_autodev_state.md +++ b/_docs/_autodev_state.md @@ -12,8 +12,10 @@ sub_step: retry_count: 0 cycle: 1 tracker: jira -last_completed_batch: 72 +last_completed_batch: 73 last_cumulative_review: batches_70-72 +current_batch: 74 +current_batch_tasks: "" last_step_outcomes: step_8: "Code is testable — no changes needed (testability_assessment.md committed; no list-of-changes, no source edits)" step_9: "Already complete — 41 blackbox test tasks (AZ-406..AZ-446) under epic AZ-262 with specs in _docs/02_tasks/todo/ were produced in a prior cycle; AZ-406 test-infrastructure bootstrap also pre-existing. Folder fallback satisfied (todo/ has test tasks, _dependencies_table.md reflects 114 product + 41 test = 155 total). No Step-9 work executed in cycle 1." diff --git a/e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py b/e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py new file mode 100644 index 0000000..cde73c8 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py @@ -0,0 +1,588 @@ +"""Unit tests for `e2e/runner/helpers/blackout_spoof_evaluator.py` (AZ-426).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +from e2e.runner.helpers.blackout_spoof_evaluator import ( + DEAD_RECKONED_LABEL, + ESCALATION_COV_2D_M, + ESCALATION_COV_FAILSAFE_M, + ESCALATION_DURATION_FAILSAFE_S, + ESCALATION_FIX_TYPE_2D, + ESCALATION_LATENCY_MS, + HONEST_ACCURACY_RATIO, + HORIZ_ACCURACY_FAILSAFE, + RECOVERY_STABLE_S, + SATELLITE_ANCHORED_LABEL, + STATUSTEXT_FAILSAFE, + STATUSTEXT_IMU_ONLY, + STATUSTEXT_RATE_MAX_HZ, + STATUSTEXT_RATE_MIN_HZ, + SWITCH_LATENCY_MS, + BlackoutWindow, + ConsistencyCheckEvent, + GpsHealthSample, + OutboundEstimateSample, + SpoofRejectedEvent, + StatustextSample, + evaluate, + evaluate_covariance_monotonic, + evaluate_escalation, + evaluate_honest_accuracy, + evaluate_recovery_gate, + evaluate_spoof_rejection, + evaluate_statustext_rate, + evaluate_switch_latency, + write_csv_evidence, +) + + +# Constants + + +def test_constants_match_spec(): + # AZ-426: AC-1 ≤400 ms, AC-4 ≥0.95×cov, AC-5 1-2 Hz, AC-6/7/8 thresholds. + assert SWITCH_LATENCY_MS == 400 + assert HONEST_ACCURACY_RATIO == 0.95 + assert STATUSTEXT_RATE_MIN_HZ == 1.0 and STATUSTEXT_RATE_MAX_HZ == 2.0 + assert ESCALATION_COV_2D_M == 100.0 + assert ESCALATION_COV_FAILSAFE_M == 500.0 + assert ESCALATION_DURATION_FAILSAFE_S == 30.0 + assert ESCALATION_FIX_TYPE_2D == 2 + assert HORIZ_ACCURACY_FAILSAFE == 999.0 + assert ESCALATION_LATENCY_MS == 500 + assert RECOVERY_STABLE_S == 10.0 + assert STATUSTEXT_IMU_ONLY == "VISUAL_BLACKOUT_IMU_ONLY" + assert STATUSTEXT_FAILSAFE == "VISUAL_BLACKOUT_FAILSAFE" + assert DEAD_RECKONED_LABEL == "dead_reckoned" + assert SATELLITE_ANCHORED_LABEL == "satellite_anchored" + + +def _window(onset_ms: int = 10_000, duration_s: float = 5.0) -> BlackoutWindow: + return BlackoutWindow( + onset_monotonic_ms=onset_ms, + end_monotonic_ms=onset_ms + int(duration_s * 1000), + ) + + +def _est( + ms: int, + *, + label: str = DEAD_RECKONED_LABEL, + cov: float = 5.0, + horiz: float | None = None, + fix_type: int = 3, +) -> OutboundEstimateSample: + return OutboundEstimateSample( + monotonic_ms=ms, + source_label=label, + cov_semi_major_m=cov, + horiz_accuracy=cov if horiz is None else horiz, + fix_type=fix_type, + ) + + +# AC-1 switch latency + + +def test_switch_latency_within_400_ms_passes(): + # Arrange + w = _window() + estimates = [ + _est(w.onset_monotonic_ms - 100, label=SATELLITE_ANCHORED_LABEL), + _est(w.onset_monotonic_ms + 350), + ] + + # Act + report = evaluate_switch_latency(w, estimates, frame_period_ms=33) + + # Assert — budget is min(400, 33) = 33 ms; 350 > 33 → fails. + assert report.first_dead_reckoned_offset_ms == 350 + assert report.passes is False + + +def test_switch_latency_within_one_frame_passes(): + # Arrange — frame period 100 ms, dead_reckoned at +50 ms → within both bounds. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 50)] + + # Act + report = evaluate_switch_latency(w, estimates, frame_period_ms=100) + + # Assert + assert report.passes is True + + +def test_switch_latency_at_one_frame_boundary_passes(): + # Arrange — exact frame-period boundary. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 100)] + + # Act + report = evaluate_switch_latency(w, estimates, frame_period_ms=100) + + # Assert + assert report.passes is True + + +def test_switch_latency_missing_dead_reckoned_fails(): + # Arrange — no dead_reckoned emission. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 50, label=SATELLITE_ANCHORED_LABEL)] + + # Act + report = evaluate_switch_latency(w, estimates, frame_period_ms=100) + + # Assert + assert report.first_dead_reckoned_offset_ms is None + assert report.passes is False + + +# AC-2 spoof rejection + + +def test_spoof_rejection_pass(): + # Arrange — spoof events present, no satellite_anchored inside window. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 500)] + spoof_events = [SpoofRejectedEvent(monotonic_ms=w.onset_monotonic_ms + 200, reason="delta>500m")] + + # Act + report = evaluate_spoof_rejection(w, estimates, spoof_events) + + # Assert + assert report.passes is True + + +def test_spoof_rejection_no_events_fails(): + # Arrange + w = _window() + estimates = [_est(w.onset_monotonic_ms + 500)] + + # Act + report = evaluate_spoof_rejection(w, estimates, spoof_events=[]) + + # Assert + assert report.passes is False + + +def test_spoof_rejection_label_returns_to_satellite_fails(): + # Arrange — spoof event present BUT label returns to satellite_anchored inside window. + w = _window() + estimates = [ + _est(w.onset_monotonic_ms + 100), + _est(w.onset_monotonic_ms + 1000, label=SATELLITE_ANCHORED_LABEL), + ] + spoof_events = [SpoofRejectedEvent(monotonic_ms=w.onset_monotonic_ms + 50, reason="x")] + + # Act + report = evaluate_spoof_rejection(w, estimates, spoof_events) + + # Assert + assert report.satellite_anchored_inside_window == 1 + assert report.passes is False + + +# AC-3 covariance monotonic + + +def test_covariance_monotonic_pass(): + # Arrange + w = _window() + estimates = [ + _est(w.onset_monotonic_ms + 100, cov=5.0), + _est(w.onset_monotonic_ms + 200, cov=5.5), + _est(w.onset_monotonic_ms + 300, cov=6.0), + ] + + # Act + report = evaluate_covariance_monotonic(w, estimates) + + # Assert + assert report.passes is True + assert report.first_decreasing_at_ms is None + + +def test_covariance_monotonic_decreasing_fails(): + # Arrange + w = _window() + estimates = [ + _est(w.onset_monotonic_ms + 100, cov=5.0), + _est(w.onset_monotonic_ms + 200, cov=4.0), + ] + + # Act + report = evaluate_covariance_monotonic(w, estimates) + + # Assert + assert report.first_decreasing_at_ms == w.onset_monotonic_ms + 200 + assert report.passes is False + + +# AC-4 honest accuracy + + +def test_honest_accuracy_pass(): + # Arrange — horiz_accuracy = cov ≥ 0.95 × cov. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 100, cov=10.0, horiz=10.0)] + + # Act + report = evaluate_honest_accuracy(w, estimates) + + # Assert + assert report.passes is True + + +def test_honest_accuracy_boundary_pass(): + # Arrange — horiz_accuracy = 0.95 × cov exactly. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 100, cov=10.0, horiz=9.5)] + + # Act + report = evaluate_honest_accuracy(w, estimates) + + # Assert + assert report.passes is True + + +def test_honest_accuracy_violation_fails(): + # Arrange — horiz_accuracy = 0.90 × cov. + w = _window() + estimates = [_est(w.onset_monotonic_ms + 100, cov=10.0, horiz=9.0)] + + # Act + report = evaluate_honest_accuracy(w, estimates) + + # Assert + assert report.violation_count == 1 + assert report.passes is False + + +# AC-5 STATUSTEXT rate + + +def test_statustext_rate_pass_at_1hz(): + # Arrange — 5 STATUSTEXTs over 5 s = 1 Hz. + w = _window(duration_s=5.0) + sts = [ + StatustextSample(monotonic_ms=w.onset_monotonic_ms + i * 1000, text=STATUSTEXT_IMU_ONLY) + for i in range(5) + ] + + # Act + report = evaluate_statustext_rate(w, sts) + + # Assert + assert report.observed_hz is not None and abs(report.observed_hz - 1.0) < 1e-6 + assert report.passes is True + + +def test_statustext_rate_pass_at_2hz(): + # Arrange — 10 STATUSTEXTs over 5 s = 2 Hz. + w = _window(duration_s=5.0) + sts = [ + StatustextSample(monotonic_ms=w.onset_monotonic_ms + i * 500, text=STATUSTEXT_IMU_ONLY) + for i in range(10) + ] + + # Act + report = evaluate_statustext_rate(w, sts) + + # Assert + assert report.passes is True + + +def test_statustext_rate_too_slow_fails(): + # Arrange — 2 STATUSTEXTs over 5 s = 0.4 Hz. + w = _window(duration_s=5.0) + sts = [ + StatustextSample(monotonic_ms=w.onset_monotonic_ms + i * 2000, text=STATUSTEXT_IMU_ONLY) + for i in range(2) + ] + + # Act + report = evaluate_statustext_rate(w, sts) + + # Assert + assert report.passes is False + + +def test_statustext_rate_too_fast_fails(): + # Arrange — 30 STATUSTEXTs over 5 s = 6 Hz. + w = _window(duration_s=5.0) + sts = [ + StatustextSample(monotonic_ms=w.onset_monotonic_ms + int(i * 5000 / 30), text=STATUSTEXT_IMU_ONLY) + for i in range(30) + ] + + # Act + report = evaluate_statustext_rate(w, sts) + + # Assert + assert report.observed_hz is not None and report.observed_hz > STATUSTEXT_RATE_MAX_HZ + assert report.passes is False + + +# AC-6 / AC-7 escalation (35 s window) + + +def _make_35s_window(onset_ms: int = 10_000) -> BlackoutWindow: + return _window(onset_ms=onset_ms, duration_s=35.0) + + +def test_escalation_non_35s_window_passes_vacuously(): + # Arrange — 5 s window with no escalation expected. + w = _window(duration_s=5.0) + estimates = [_est(w.onset_monotonic_ms + 100, cov=10.0)] + + # Act + report = evaluate_escalation(w, estimates, statustexts=[], is_35s_window=False) + + # Assert + assert report.passes is True + + +def test_escalation_35s_no_crossings_passes(): + # Arrange — covariance stays below ESCALATION_COV_2D_M for whole window. + w = _make_35s_window() + estimates = [ + _est(w.onset_monotonic_ms + i * 1000, cov=10.0 + i * 0.1) + for i in range(30) + ] + + # Act + report = evaluate_escalation(w, estimates, statustexts=[], is_35s_window=True) + + # Assert — duration crossing at 30 s alone still triggers AC-7 path; no + # failsafe STATUSTEXT → fails AC-7. + assert report.cov500_or_30s_crossed is True + assert report.passes_ac7 is False + + +def test_escalation_35s_ac6_fix_type_degraded_passes(): + # Arrange — cov crosses 100 m at 5 s; fix_type drops to 2 from then on. + w = _make_35s_window() + estimates = [] + for i in range(30): + t = w.onset_monotonic_ms + i * 1000 + cov = 50.0 if i < 5 else 150.0 + fix = 3 if i < 5 else 2 + estimates.append(_est(t, cov=cov, horiz=cov, fix_type=fix)) + # Provide failsafe STATUSTEXT at +30 s (within ESCALATION_LATENCY_MS of duration breach). + failsafe_at = w.onset_monotonic_ms + int(ESCALATION_DURATION_FAILSAFE_S * 1000) + statustexts = [ + StatustextSample(monotonic_ms=failsafe_at + 100, text=STATUSTEXT_FAILSAFE) + ] + # All post-failsafe-trigger samples need horiz_accuracy == 999. + for i in range(30): + if estimates[i].monotonic_ms >= failsafe_at: + estimates[i] = OutboundEstimateSample( + monotonic_ms=estimates[i].monotonic_ms, + source_label=DEAD_RECKONED_LABEL, + cov_semi_major_m=estimates[i].cov_semi_major_m, + horiz_accuracy=HORIZ_ACCURACY_FAILSAFE, + fix_type=2, + ) + + # Act + report = evaluate_escalation(w, estimates, statustexts, is_35s_window=True) + + # Assert + assert report.passes_ac6 is True + assert report.passes_ac7 is True + + +def test_escalation_35s_ac6_fix_type_not_degraded_fails(): + # Arrange — cov crosses 100 m but fix_type stays 3. + w = _make_35s_window() + estimates = [] + for i in range(30): + cov = 50.0 if i < 5 else 150.0 + estimates.append(_est(w.onset_monotonic_ms + i * 1000, cov=cov, fix_type=3)) + + # Act + report = evaluate_escalation(w, estimates, statustexts=[], is_35s_window=True) + + # Assert + assert report.passes_ac6 is False + + +def test_escalation_35s_ac7_horiz_not_999_fails(): + # Arrange — failsafe trigger reached but horiz_accuracy != 999. + w = _make_35s_window() + estimates = [] + for i in range(35): + cov = 50.0 + i * 20.0 # crosses 100 then 500. + estimates.append(_est(w.onset_monotonic_ms + i * 1000, cov=cov, horiz=cov, fix_type=2)) + failsafe_at = w.onset_monotonic_ms + int(ESCALATION_DURATION_FAILSAFE_S * 1000) + statustexts = [ + StatustextSample(monotonic_ms=failsafe_at + 100, text=STATUSTEXT_FAILSAFE) + ] + + # Act + report = evaluate_escalation(w, estimates, statustexts, is_35s_window=True) + + # Assert + assert report.horiz_accuracy_999 is False + assert report.passes_ac7 is False + + +# AC-8 recovery gate + + +def _post_window(w: BlackoutWindow) -> tuple[int, int]: + return w.end_monotonic_ms, w.end_monotonic_ms + int(RECOVERY_STABLE_S * 1000) + 500 + + +def test_recovery_gate_pass(): + # Arrange — 12 s of healthy GPS + consistency pass + then satellite_anchored emission. + w = _window() + end, recovery = _post_window(w) + estimates = [ + _est(end + 500), + _est(recovery + 100, label=SATELLITE_ANCHORED_LABEL), + ] + gps_health = [ + GpsHealthSample(monotonic_ms=end + i * 1000, healthy=True, spoofed=False) + for i in range(12) + ] + consistency = [ConsistencyCheckEvent(monotonic_ms=end + 5000, passed=True)] + + # Act + report = evaluate_recovery_gate(w, estimates, gps_health, consistency) + + # Assert + assert report.passes is True + assert report.recovery_at_ms == recovery + 100 + + +def test_recovery_gate_unstable_fails(): + # Arrange — GPS goes unhealthy mid-stability window. + w = _window() + end, recovery = _post_window(w) + estimates = [_est(recovery + 100, label=SATELLITE_ANCHORED_LABEL)] + gps_health = [ + GpsHealthSample(monotonic_ms=end + i * 1000, healthy=(i != 5), spoofed=False) + for i in range(12) + ] + consistency = [ConsistencyCheckEvent(monotonic_ms=end + 5000, passed=True)] + + # Act + report = evaluate_recovery_gate(w, estimates, gps_health, consistency) + + # Assert + assert report.passes is False + + +def test_recovery_gate_spoofed_fails(): + # Arrange — GPS healthy but spoofed=True for one sample. + w = _window() + end, recovery = _post_window(w) + estimates = [_est(recovery + 100, label=SATELLITE_ANCHORED_LABEL)] + gps_health = [ + GpsHealthSample(monotonic_ms=end + i * 1000, healthy=True, spoofed=(i == 3)) + for i in range(12) + ] + consistency = [ConsistencyCheckEvent(monotonic_ms=end + 5000, passed=True)] + + # Act + report = evaluate_recovery_gate(w, estimates, gps_health, consistency) + + # Assert + assert report.passes is False + + +def test_recovery_gate_no_consistency_check_fails(): + # Arrange + w = _window() + end, recovery = _post_window(w) + estimates = [_est(recovery + 100, label=SATELLITE_ANCHORED_LABEL)] + gps_health = [ + GpsHealthSample(monotonic_ms=end + i * 1000, healthy=True, spoofed=False) + for i in range(12) + ] + + # Act + report = evaluate_recovery_gate(w, estimates, gps_health, consistency_checks=[]) + + # Assert + assert report.consistency_check_passed is False + assert report.passes is False + + +def test_recovery_gate_no_recovery_attempt_vacuous_pass(): + # Arrange — no satellite_anchored post-window. + w = _window() + estimates = [_est(w.end_monotonic_ms + 500)] + + # Act + report = evaluate_recovery_gate(w, estimates, gps_health=[], consistency_checks=[]) + + # Assert + assert report.recovery_at_ms is None + assert report.passes is True + + +# Aggregate evaluate + CSV evidence + + +def _make_passing_5s_inputs() -> dict: + w = _window(duration_s=5.0) + end, recovery = _post_window(w) + estimates = [_est(w.onset_monotonic_ms + 50)] + estimates.extend( + _est(w.onset_monotonic_ms + 100 + i * 100, cov=10.0 + i * 0.1) + for i in range(20) + ) + estimates.append(_est(recovery + 100, label=SATELLITE_ANCHORED_LABEL)) + statustexts = [ + StatustextSample(monotonic_ms=w.onset_monotonic_ms + i * 1000, text=STATUSTEXT_IMU_ONLY) + for i in range(5) + ] + spoof_events = [SpoofRejectedEvent(monotonic_ms=w.onset_monotonic_ms + 50, reason="x")] + gps_health = [ + GpsHealthSample(monotonic_ms=end + i * 1000, healthy=True, spoofed=False) + for i in range(12) + ] + consistency = [ConsistencyCheckEvent(monotonic_ms=end + 5000, passed=True)] + return dict( + window=w, + estimates=estimates, + statustexts=statustexts, + spoof_events=spoof_events, + gps_health=gps_health, + consistency_checks=consistency, + frame_period_ms=100, + is_35s_window=False, + ) + + +def test_evaluate_5s_all_pass(): + # Arrange + inputs = _make_passing_5s_inputs() + + # Act + report = evaluate(**inputs) + + # Assert + assert report.passes is True + + +def test_write_csv_evidence_round_trips(tmp_path: Path): + # Arrange + inputs = _make_passing_5s_inputs() + report = evaluate(**inputs) + + # Act + out = write_csv_evidence(tmp_path / "ft-n-04.csv", report) + + # Assert + with out.open() as fh: + rows = list(csv.DictReader(fh)) + assert len(rows) == 1 + assert rows[0]["passes"] == "true" + assert rows[0]["ac1_passes"] == "true" + assert rows[0]["ac2_passes"] == "true" diff --git a/e2e/_unit_tests/helpers/test_outage_request_evaluator.py b/e2e/_unit_tests/helpers/test_outage_request_evaluator.py new file mode 100644 index 0000000..5273b8e --- /dev/null +++ b/e2e/_unit_tests/helpers/test_outage_request_evaluator.py @@ -0,0 +1,353 @@ +"""Unit tests for `e2e/runner/helpers/outage_request_evaluator.py` (AZ-425).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +from e2e.runner.helpers.outage_request_evaluator import ( + DEAD_RECKONED_LABEL, + MIN_OUTAGE_FRAMES, + OUTAGE_THRESHOLD_S, + STATUSTEXT_REGEX, + TOLERANCE_S, + EkfDivergenceEvent, + OutboundEstimateSample, + StatustextSample, + detect_outage_windows, + evaluate, + evaluate_window, + write_csv_evidence, +) + + +# Constants + + +def test_constants_match_spec(): + # AZ-425: AC-1 ≥3 frames; AC-2 2 s ±500 ms; AC-3 dead_reckoned label. + assert MIN_OUTAGE_FRAMES == 3 + assert OUTAGE_THRESHOLD_S == 2.0 + assert TOLERANCE_S == 0.5 + assert STATUSTEXT_REGEX == "OPERATOR_RELOC_REQUEST" + assert DEAD_RECKONED_LABEL == "dead_reckoned" + + +# detect_outage_windows + + +def _est(frame: int, label: str = "satellite_anchored", ms: int = 0) -> OutboundEstimateSample: + return OutboundEstimateSample( + frame_idx=frame, + monotonic_ms=ms if ms else frame * 100, + source_label=label, + ) + + +def test_detect_no_outage_returns_empty(): + # Arrange — full frame sequence with all estimates. + expected = list(range(10)) + estimates = [_est(i) for i in expected] + + # Act + windows = detect_outage_windows(expected, estimates, frame_period_ms=100) + + # Assert + assert windows == [] + + +def test_detect_run_below_min_length_ignored(): + # Arrange — 2-frame gap is below MIN_OUTAGE_FRAMES=3. + expected = list(range(10)) + estimates = [_est(i) for i in expected if i not in (4, 5)] + + # Act + windows = detect_outage_windows(expected, estimates, frame_period_ms=100) + + # Assert + assert windows == [] + + +def test_detect_single_outage_window(): + # Arrange — 3-frame gap at indices 4,5,6. + expected = list(range(10)) + estimates = [_est(i) for i in expected if i not in (4, 5, 6)] + + # Act + windows = detect_outage_windows( + expected, estimates, frame_period_ms=100, replay_start_monotonic_ms=1000 + ) + + # Assert + assert len(windows) == 1 + w = windows[0] + assert w.first_missing_frame_idx == 4 + assert w.last_missing_frame_idx == 6 + assert w.length_frames == 3 + assert w.onset_monotonic_ms == 1000 + 4 * 100 # 1400 + assert w.end_monotonic_ms == 1000 + 7 * 100 # 1700 + assert w.duration_ms == 300 + + +def test_detect_multiple_windows(): + # Arrange — two gaps: 4-6 and 12-15. + expected = list(range(20)) + skip = {4, 5, 6, 12, 13, 14, 15} + estimates = [_est(i) for i in expected if i not in skip] + + # Act + windows = detect_outage_windows(expected, estimates, frame_period_ms=100) + + # Assert + assert len(windows) == 2 + assert windows[0].first_missing_frame_idx == 4 and windows[0].length_frames == 3 + assert windows[1].first_missing_frame_idx == 12 and windows[1].length_frames == 4 + + +def test_detect_trailing_outage_window(): + # Arrange — gap at the end of the sequence. + expected = list(range(10)) + estimates = [_est(i) for i in expected if i < 7] + + # Act + windows = detect_outage_windows(expected, estimates, frame_period_ms=100) + + # Assert + assert len(windows) == 1 + assert windows[0].first_missing_frame_idx == 7 + assert windows[0].last_missing_frame_idx == 9 + + +# evaluate_window — AC-2 STATUSTEXT timing + + +def _window_at(onset_ms: int, length: int = 3, period_ms: int = 100): + # Ensure expected sequence is long enough to fully contain the gap + a trailing frame. + total = max(20, length + 5) + expected = list(range(total)) + skip = set(range(2, 2 + length)) + estimates = [_est(i, ms=i * period_ms) for i in expected if i not in skip] + [w] = detect_outage_windows( + expected, + estimates, + frame_period_ms=period_ms, + replay_start_monotonic_ms=onset_ms - 2 * period_ms, + ) + return w, estimates + + +def test_statustext_within_tolerance_passes(): + # Arrange — STATUSTEXT exactly at onset+2 s. + window, estimates = _window_at(onset_ms=10_000, length=30, period_ms=100) + statustexts = [ + StatustextSample(monotonic_ms=window.onset_monotonic_ms + 2000, text="OPERATOR_RELOC_REQUEST"), + ] + + # Act + report = evaluate_window(window, estimates, statustexts, ekf_events=[]) + + # Assert + assert report.statustext_offset_ms == 2000 + assert report.passes_statustext is True + + +def test_statustext_within_tolerance_late_passes(): + # Arrange — STATUSTEXT at onset+2.4 s (within ±500 ms). + window, estimates = _window_at(onset_ms=10_000, length=30) + statustexts = [ + StatustextSample(monotonic_ms=window.onset_monotonic_ms + 2400, text="OPERATOR_RELOC_REQUEST"), + ] + + # Act + report = evaluate_window(window, estimates, statustexts, ekf_events=[]) + + # Assert + assert report.passes_statustext is True + + +def test_statustext_too_early_fails(): + # Arrange — STATUSTEXT at onset+1.0 s (before 1.5 s lower bound). + window, estimates = _window_at(onset_ms=10_000, length=30) + statustexts = [ + StatustextSample(monotonic_ms=window.onset_monotonic_ms + 1000, text="OPERATOR_RELOC_REQUEST"), + ] + + # Act + report = evaluate_window(window, estimates, statustexts, ekf_events=[]) + + # Assert + assert report.statustext_offset_ms == 1000 + assert report.passes_statustext is False + + +def test_statustext_too_late_fails(): + # Arrange — STATUSTEXT at onset+3.0 s (beyond 2.5 s upper bound). + window, estimates = _window_at(onset_ms=10_000, length=30) + statustexts = [ + StatustextSample(monotonic_ms=window.onset_monotonic_ms + 3000, text="OPERATOR_RELOC_REQUEST"), + ] + + # Act + report = evaluate_window(window, estimates, statustexts, ekf_events=[]) + + # Assert + assert report.passes_statustext is False + + +def test_statustext_missing_fails(): + # Arrange + window, estimates = _window_at(onset_ms=10_000, length=30) + + # Act + report = evaluate_window(window, estimates, statustexts=[], ekf_events=[]) + + # Assert + assert report.statustext_offset_ms is None + assert report.passes_statustext is False + + +def test_statustext_payload_mismatch_fails(): + # Arrange — different STATUSTEXT message at the right time. + window, estimates = _window_at(onset_ms=10_000, length=30) + statustexts = [ + StatustextSample(monotonic_ms=window.onset_monotonic_ms + 2000, text="EKF_VARIANCE"), + ] + + # Act + report = evaluate_window(window, estimates, statustexts, ekf_events=[]) + + # Assert + assert report.passes_statustext is False + + +# AC-3 dead_reckoned during outage + + +def test_dead_reckoned_during_window_passes(): + # Arrange — outage 4-6 with dead_reckoned estimate at ms 500 (frame 5 in window). + expected = list(range(20)) + skip = {4, 5, 6} + estimates = [ + _est(i, ms=i * 100) + for i in expected + if i not in skip + ] + # Add dead_reckoned filler emission during the outage window. + estimates.append( + OutboundEstimateSample(frame_idx=4, monotonic_ms=500, source_label=DEAD_RECKONED_LABEL) + ) + [w] = detect_outage_windows(expected, [e for e in estimates if e.frame_idx not in {4, 5, 6} or e.source_label == "satellite_anchored"], frame_period_ms=100) + # Note: detection ignores dead_reckoned filler so window still spans 4-6. + + # Act + report = evaluate_window(w, estimates, statustexts=[], ekf_events=[]) + + # Assert — at least one dead_reckoned emission with monotonic_ms in [onset_ms, end_ms]. + assert report.dead_reckoned_count >= 1 + assert report.passes_dead_reckoned is True + + +def test_dead_reckoned_absent_fails(): + # Arrange + window, estimates = _window_at(onset_ms=10_000, length=3, period_ms=100) + + # Act + report = evaluate_window(window, estimates, statustexts=[], ekf_events=[]) + + # Assert + assert report.dead_reckoned_count == 0 + assert report.passes_dead_reckoned is False + + +# AC-4 EKF divergence + + +def test_ekf_divergence_during_window_fails(): + # Arrange + window, estimates = _window_at(onset_ms=10_000, length=30) + events = [ + EkfDivergenceEvent( + monotonic_ms=window.onset_monotonic_ms + 1000, reason="velocity_innov" + ) + ] + + # Act + report = evaluate_window(window, estimates, statustexts=[], ekf_events=events) + + # Assert + assert report.ekf_divergence_count == 1 + assert report.passes_ekf is False + + +def test_ekf_divergence_outside_window_ignored(): + # Arrange + window, estimates = _window_at(onset_ms=10_000, length=30) + events = [ + EkfDivergenceEvent( + monotonic_ms=window.end_monotonic_ms + 1000, reason="velocity_innov" + ) + ] + + # Act + report = evaluate_window(window, estimates, statustexts=[], ekf_events=events) + + # Assert + assert report.passes_ekf is True + + +# evaluate aggregate + + +def test_evaluate_all_pass(): + # Arrange — single outage with everything in order. + expected = list(range(40)) + skip = set(range(10, 40)) + period_ms = 100 + estimates = [ + _est(i, ms=i * period_ms) + for i in expected + if i not in skip + ] + estimates.append( + OutboundEstimateSample( + frame_idx=10, monotonic_ms=10 * period_ms + 500, source_label=DEAD_RECKONED_LABEL + ) + ) + statustexts = [ + StatustextSample(monotonic_ms=10 * period_ms + 2000, text="OPERATOR_RELOC_REQUEST") + ] + + # Act + reports = evaluate( + expected, + estimates, + statustexts, + ekf_events=[], + frame_period_ms=period_ms, + ) + + # Assert + assert len(reports) == 1 + assert reports[0].passes is True + + +# CSV evidence + + +def test_write_csv_evidence_round_trips(tmp_path: Path): + # Arrange + window, estimates = _window_at(onset_ms=10_000, length=30) + statustexts = [ + StatustextSample(monotonic_ms=window.onset_monotonic_ms + 2000, text="OPERATOR_RELOC_REQUEST") + ] + report = evaluate_window(window, estimates, statustexts, ekf_events=[]) + + # Act + out = write_csv_evidence(tmp_path / "ft-n-03.csv", [report]) + + # Assert + with out.open() as fh: + rows = list(csv.DictReader(fh)) + assert len(rows) == 1 + assert rows[0]["passes_statustext"] == "true" + assert int(rows[0]["length_frames"]) == 30 diff --git a/e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py b/e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py new file mode 100644 index 0000000..691d7b0 --- /dev/null +++ b/e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py @@ -0,0 +1,330 @@ +"""Unit tests for `e2e/runner/helpers/outlier_tolerance_evaluator.py` (AZ-424).""" + +from __future__ import annotations + +import csv +from pathlib import Path + +import pytest + +from e2e.runner.helpers.outlier_tolerance_evaluator import ( + COVARIANCE_WINDOW_FRAMES, + DRIFT_BUDGET_M, + MIN_OUTLIER_COUNT, + GtPose, + OutboundEstimate, + OutlierEvent, + evaluate, + evaluate_event, + load_outlier_manifest, + write_csv_evidence, +) + + +# Constants + + +def test_constants_match_spec(): + # AC-2 budget + AC-3 window + AC-1 minimum count, per AZ-424. + assert DRIFT_BUDGET_M == 50.0 + assert COVARIANCE_WINDOW_FRAMES == 3 + assert MIN_OUTLIER_COUNT == 10 + + +# Manifest loading + + +def _write_manifest(path: Path, rows: list[dict]) -> None: + fieldnames = [ + "frame_idx", + "src_jpeg_path", + "replacement_tile_x", + "replacement_tile_y", + "geodesic_offset_m", + "seed", + ] + with path.open("w", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=fieldnames) + writer.writeheader() + for r in rows: + row = {k: "" for k in fieldnames} + row.update(r) + writer.writerow(row) + + +def test_load_outlier_manifest_missing_file_raises(tmp_path: Path): + # Assert + with pytest.raises(FileNotFoundError, match="outlier manifest not found"): + load_outlier_manifest(tmp_path / "nope.csv") + + +def test_load_outlier_manifest_missing_columns_raises(tmp_path: Path): + # Arrange + p = tmp_path / "manifest.csv" + with p.open("w", newline="") as fh: + writer = csv.DictWriter(fh, fieldnames=["frame_idx", "src_jpeg_path"]) + writer.writeheader() + writer.writerow({"frame_idx": "1", "src_jpeg_path": "x.jpg"}) + + # Assert + with pytest.raises(ValueError, match="missing required columns"): + load_outlier_manifest(p) + + +def test_load_outlier_manifest_returns_events(tmp_path: Path): + # Arrange + p = tmp_path / "manifest.csv" + _write_manifest( + p, + [ + {"frame_idx": "10", "src_jpeg_path": "AD000011.jpg", "geodesic_offset_m": "412.5"}, + {"frame_idx": "20", "src_jpeg_path": "AD000021.jpg", "geodesic_offset_m": "381.0"}, + ], + ) + + # Act + events = load_outlier_manifest(p) + + # Assert + assert len(events) == 2 + assert events[0] == OutlierEvent( + frame_idx=10, geodesic_offset_m=412.5, src_jpeg_path="AD000011.jpg" + ) + assert events[1].frame_idx == 20 + + +# evaluate_event — AC-2 drift bound + + +def _est(frame: int, lat: float, lon: float, cov: float = 5.0) -> OutboundEstimate: + return OutboundEstimate( + frame_idx=frame, + monotonic_ms=frame * 100, + lat_deg=lat, + lon_deg=lon, + cov_semi_major_m=cov, + source_label="C3_VIO", + ) + + +def _gt(frame: int, lat: float, lon: float) -> GtPose: + return GtPose(frame_idx=frame, lat_deg=lat, lon_deg=lon) + + +def test_evaluate_event_drift_within_budget(): + # Arrange — estimate before/after match GT exactly; outlier frame drifts. + estimates = { + 9: _est(9, 50.0000, 30.0000, cov=4.0), + 10: _est(10, 50.0050, 30.0050, cov=5.0), # outlier + 11: _est(11, 50.0001, 30.0001, cov=5.0), + } + gt = { + 9: _gt(9, 50.0000, 30.0000), + 10: _gt(10, 50.0001, 30.0001), + 11: _gt(11, 50.0002, 30.0002), + } + event = OutlierEvent(frame_idx=10, geodesic_offset_m=412.5, src_jpeg_path="x.jpg") + + # Act + report = evaluate_event(event, estimates, gt) + + # Assert + assert report.frame_idx == 10 + assert report.drift_m is not None + assert report.drift_m <= DRIFT_BUDGET_M + assert report.passes_drift is True + + +def test_evaluate_event_drift_exceeds_budget_fails(): + # Arrange — after-frame error is >> before-frame error. + estimates = { + 9: _est(9, 50.0000, 30.0000), + 10: _est(10, 50.0050, 30.0050), + 11: _est(11, 50.0010, 30.0010), # ~129 m off + } + gt = { + 9: _gt(9, 50.0000, 30.0000), + 10: _gt(10, 50.0001, 30.0001), + 11: _gt(11, 50.0000, 30.0000), + } + event = OutlierEvent(frame_idx=10, geodesic_offset_m=400.0, src_jpeg_path="x.jpg") + + # Act + report = evaluate_event(event, estimates, gt) + + # Assert + assert report.drift_m is not None and report.drift_m > DRIFT_BUDGET_M + assert report.passes_drift is False + assert report.passes is False + + +def test_evaluate_event_missing_neighbour_drift_none(): + # Arrange — only outlier frame present. + estimates = {10: _est(10, 50.0050, 30.0050)} + gt = {10: _gt(10, 50.0001, 30.0001)} + event = OutlierEvent(frame_idx=10, geodesic_offset_m=400.0, src_jpeg_path="x.jpg") + + # Act + report = evaluate_event(event, estimates, gt) + + # Assert + assert report.drift_m is None + assert report.passes_drift is False + + +# evaluate_event — AC-3 covariance monotonic + + +def test_evaluate_event_cov_monotonic_passes(): + # Arrange + estimates = { + 9: _est(9, 50.0, 30.0, cov=4.0), + 10: _est(10, 50.0, 30.0, cov=5.0), + 11: _est(11, 50.0, 30.0, cov=5.5), + } + gt = {f: _gt(f, 50.0, 30.0) for f in (9, 10, 11)} + event = OutlierEvent(frame_idx=10, geodesic_offset_m=400.0, src_jpeg_path="x.jpg") + + # Act + report = evaluate_event(event, estimates, gt) + + # Assert + assert report.cov_non_decreasing is True + assert report.passes_covariance is True + + +def test_evaluate_event_cov_decreasing_fails(): + # Arrange — outlier frame cov is lower than before frame. + estimates = { + 9: _est(9, 50.0, 30.0, cov=5.0), + 10: _est(10, 50.0, 30.0, cov=4.0), # decrease — violates AC-3 + 11: _est(11, 50.0, 30.0, cov=5.0), + } + gt = {f: _gt(f, 50.0, 30.0) for f in (9, 10, 11)} + event = OutlierEvent(frame_idx=10, geodesic_offset_m=400.0, src_jpeg_path="x.jpg") + + # Act + report = evaluate_event(event, estimates, gt) + + # Assert + assert report.cov_non_decreasing is False + assert report.passes_covariance is False + + +def test_evaluate_event_cov_flat_window_passes(): + # Arrange — equal covariances satisfy non-decreasing. + estimates = { + 9: _est(9, 50.0, 30.0, cov=5.0), + 10: _est(10, 50.0, 30.0, cov=5.0), + 11: _est(11, 50.0, 30.0, cov=5.0), + } + gt = {f: _gt(f, 50.0, 30.0) for f in (9, 10, 11)} + event = OutlierEvent(frame_idx=10, geodesic_offset_m=400.0, src_jpeg_path="x.jpg") + + # Act + report = evaluate_event(event, estimates, gt) + + # Assert + assert report.cov_non_decreasing is True + + +# Aggregate evaluate — AC-1 minimum count + + +def test_evaluate_count_below_minimum_fails(): + # Arrange — only 5 outliers; AC-1 requires ≥10. + events = [ + OutlierEvent(frame_idx=i * 10, geodesic_offset_m=400.0, src_jpeg_path=f"x{i}.jpg") + for i in range(1, 6) + ] + estimates: list[OutboundEstimate] = [] + gt: list[GtPose] = [] + for ev in events: + for delta in (-1, 0, 1): + estimates.append(_est(ev.frame_idx + delta, 50.0, 30.0, cov=5.0)) + gt.append(_gt(ev.frame_idx + delta, 50.0, 30.0)) + + # Act + report = evaluate(events, estimates, gt) + + # Assert + assert report.total_outliers == 5 + assert report.passes_count is False + assert report.passes is False + + +def test_evaluate_count_at_minimum_passes_count_gate(): + # Arrange — exactly 10 outliers with non-violating drift/cov. + events = [ + OutlierEvent(frame_idx=i * 10, geodesic_offset_m=400.0, src_jpeg_path=f"x{i}.jpg") + for i in range(1, 11) + ] + estimates: list[OutboundEstimate] = [] + gt: list[GtPose] = [] + for ev in events: + for delta in (-1, 0, 1): + estimates.append(_est(ev.frame_idx + delta, 50.0, 30.0, cov=5.0)) + gt.append(_gt(ev.frame_idx + delta, 50.0, 30.0)) + + # Act + report = evaluate(events, estimates, gt) + + # Assert + assert report.total_outliers == 10 + assert report.passes_count is True + assert report.failed_event_count == 0 + assert report.passes is True + + +def test_evaluate_mixed_pass_fail_aggregates_correctly(): + # Arrange — 10 events, one with drift violation. + events = [ + OutlierEvent(frame_idx=i * 10, geodesic_offset_m=400.0, src_jpeg_path=f"x{i}.jpg") + for i in range(1, 11) + ] + estimates: list[OutboundEstimate] = [] + gt: list[GtPose] = [] + for ev in events: + for delta in (-1, 0, 1): + estimates.append(_est(ev.frame_idx + delta, 50.0, 30.0, cov=5.0)) + gt.append(_gt(ev.frame_idx + delta, 50.0, 30.0)) + # Override frame 31 to be 200 m off — produces drift > 50 m for event at frame_idx=30. + estimates = [e for e in estimates if e.frame_idx != 31] + estimates.append(_est(31, 50.0018, 30.0, cov=5.0)) # ≈200 m off + + # Act + report = evaluate(events, estimates, gt) + + # Assert + assert report.total_outliers == 10 + assert report.failed_event_count == 1 + assert report.passes is False + + +# CSV evidence writer + + +def test_write_csv_evidence_round_trips(tmp_path: Path): + # Arrange + events = [ + OutlierEvent(frame_idx=10, geodesic_offset_m=412.5, src_jpeg_path="AD000011.jpg"), + OutlierEvent(frame_idx=20, geodesic_offset_m=381.0, src_jpeg_path="AD000021.jpg"), + ] + estimates: list[OutboundEstimate] = [] + gt: list[GtPose] = [] + for ev in events: + for delta in (-1, 0, 1): + estimates.append(_est(ev.frame_idx + delta, 50.0, 30.0, cov=5.0)) + gt.append(_gt(ev.frame_idx + delta, 50.0, 30.0)) + report = evaluate(events, estimates, gt) + + # Act + out = write_csv_evidence(tmp_path / "ft_n_01_evidence.csv", report) + + # Assert + assert out.exists() + with out.open() as fh: + rows = list(csv.DictReader(fh)) + assert [int(r["frame_idx"]) for r in rows] == [10, 20] + assert all(r["passes"] == "true" for r in rows) + assert all(r["cov_non_decreasing"] == "true" for r in rows) diff --git a/e2e/_unit_tests/test_directory_layout.py b/e2e/_unit_tests/test_directory_layout.py index ec3c8c6..e826468 100644 --- a/e2e/_unit_tests/test_directory_layout.py +++ b/e2e/_unit_tests/test_directory_layout.py @@ -52,6 +52,9 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "runner/helpers/msp_frame_observer.py", "runner/helpers/ap_contract_evaluator.py", "runner/helpers/cold_start_evaluator.py", + "runner/helpers/outlier_tolerance_evaluator.py", + "runner/helpers/outage_request_evaluator.py", + "runner/helpers/blackout_spoof_evaluator.py", "fixtures/mock-suite-sat/Dockerfile", "fixtures/mock-suite-sat/app.py", "fixtures/mock-suite-sat/requirements.txt", @@ -96,7 +99,10 @@ E2E_ROOT = Path(__file__).resolve().parents[1] "tests/positive/test_ft_p_09_inav.py", "tests/positive/test_ft_p_10_smoothing_lookback.py", "tests/positive/test_ft_p_11_cold_start_init.py", + "tests/negative/test_ft_n_01_outlier_tolerance.py", "tests/negative/test_ft_n_02_sharp_turn_failure.py", + "tests/negative/test_ft_n_03_outage_reloc.py", + "tests/negative/test_ft_n_04_blackout_spoof.py", ], ) def test_required_path_exists(relative_path: str) -> None: diff --git a/e2e/runner/helpers/blackout_spoof_evaluator.py b/e2e/runner/helpers/blackout_spoof_evaluator.py new file mode 100644 index 0000000..c360a61 --- /dev/null +++ b/e2e/runner/helpers/blackout_spoof_evaluator.py @@ -0,0 +1,557 @@ +"""Blackout-spoof evaluation for FT-N-04 (AZ-426 / AC-3.5 + AC-NEW-8). + +Three-window ladder (5 s / 15 s / 35 s) with the +``blackout_spoof.py`` injector + FC-inbound spoof proxy. The +evaluator validates per AZ-426: + +* AC-1: switch latency — within ≤1 frame OR ≤``SWITCH_LATENCY_MS`` + (whichever is shorter), the first outbound estimate after blackout + onset carries ``source_label = dead_reckoned``. +* AC-2: spoof rejection — at least one FDR ``spoof-rejected`` event + is observed during the blackout window AND zero spoofed GPS records + are consumed into the estimator (label never returns to + ``satellite_anchored`` during the window). +* AC-3: monotonic covariance — ``cov_semi_major_m`` is non-decreasing + across consecutive emissions inside the window. +* AC-4: honest horiz_accuracy — + ``horiz_accuracy ≥ HONEST_ACCURACY_RATIO × cov_semi_major_m`` + for every emission. +* AC-5: STATUSTEXT 1-2 Hz — + ``VISUAL_BLACKOUT_IMU_ONLY`` STATUSTEXT rate is in + ``[STATUSTEXT_RATE_MIN_HZ, STATUSTEXT_RATE_MAX_HZ]`` throughout the + window. +* AC-6 (35 s only): when 95 % covariance crosses + ``ESCALATION_COV_2D_M``, fix_type degrades to ≤``ESCALATION_FIX_TYPE_2D``. +* AC-7 (35 s only): when 95 % covariance crosses + ``ESCALATION_COV_FAILSAFE_M`` OR window duration exceeds + ``ESCALATION_DURATION_FAILSAFE_S``, ``horiz_accuracy == + HORIZ_ACCURACY_FAILSAFE`` AND ``VISUAL_BLACKOUT_FAILSAFE`` + STATUSTEXT is emitted within ≤``ESCALATION_LATENCY_MS`` of the + crossing. +* AC-8: recovery gate — after blackout end, label only returns to + ``satellite_anchored`` once both (a) FC GPS-health is stable + + non-spoofed for ≥``RECOVERY_STABLE_S`` AND (b) a + visual/satellite consistency check succeeds. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Sequence + +# AC-1 +SWITCH_LATENCY_MS = 400 +# AC-2 +DEAD_RECKONED_LABEL = "dead_reckoned" +SATELLITE_ANCHORED_LABEL = "satellite_anchored" +# AC-4 +HONEST_ACCURACY_RATIO = 0.95 +# AC-5 +STATUSTEXT_IMU_ONLY = "VISUAL_BLACKOUT_IMU_ONLY" +STATUSTEXT_RATE_MIN_HZ = 1.0 +STATUSTEXT_RATE_MAX_HZ = 2.0 +# AC-6 / AC-7 +STATUSTEXT_FAILSAFE = "VISUAL_BLACKOUT_FAILSAFE" +ESCALATION_COV_2D_M = 100.0 +ESCALATION_COV_FAILSAFE_M = 500.0 +ESCALATION_DURATION_FAILSAFE_S = 30.0 +ESCALATION_FIX_TYPE_2D = 2 # MAVLink GPS_FIX_TYPE_2D +HORIZ_ACCURACY_FAILSAFE = 999.0 +ESCALATION_LATENCY_MS = 500 +# AC-8 +RECOVERY_STABLE_S = 10.0 + + +@dataclass(frozen=True) +class BlackoutWindow: + """The injector-emitted window the evaluator is bound to.""" + + onset_monotonic_ms: int + end_monotonic_ms: int + + @property + def duration_s(self) -> float: + return (self.end_monotonic_ms - self.onset_monotonic_ms) / 1000.0 + + +@dataclass(frozen=True) +class OutboundEstimateSample: + """One outbound estimate with fields used by FT-N-04 ACs.""" + + monotonic_ms: int + source_label: str + cov_semi_major_m: float + horiz_accuracy: float # AP GPS_INPUT.horiz_accuracy (m) + fix_type: int # MAVLink GPS fix type (0..6); -1 if unavailable + + +@dataclass(frozen=True) +class StatustextSample: + monotonic_ms: int + text: str + + +@dataclass(frozen=True) +class SpoofRejectedEvent: + """One FDR `spoof-rejected` event.""" + + monotonic_ms: int + reason: str + + +@dataclass(frozen=True) +class GpsHealthSample: + """FC-side GPS health sample (post-blackout, for recovery gate).""" + + monotonic_ms: int + healthy: bool + spoofed: bool + + +@dataclass(frozen=True) +class ConsistencyCheckEvent: + """Visual/satellite consistency check outcome (post-blackout).""" + + monotonic_ms: int + passed: bool + + +@dataclass(frozen=True) +class SwitchLatencyReport: + """AC-1 result.""" + + first_dead_reckoned_offset_ms: int | None # ms after window onset + frame_period_ms: int + passes: bool + + +@dataclass(frozen=True) +class SpoofRejectionReport: + """AC-2 result.""" + + spoof_rejected_count: int + satellite_anchored_inside_window: int + passes: bool + + +@dataclass(frozen=True) +class CovarianceMonotonicReport: + """AC-3 result.""" + + first_decreasing_at_ms: int | None + sample_count: int + passes: bool + + +@dataclass(frozen=True) +class HonestAccuracyReport: + """AC-4 result.""" + + violation_count: int + sample_count: int + passes: bool + + +@dataclass(frozen=True) +class StatustextRateReport: + """AC-5 result for VISUAL_BLACKOUT_IMU_ONLY.""" + + observed_hz: float | None + count: int + passes: bool + + +@dataclass(frozen=True) +class EscalationReport: + """AC-6 + AC-7 result (35 s window only — other windows return passes=True).""" + + cov2d_crossed: bool + cov2d_crossed_at_ms: int | None + fix_type_degraded: bool # AC-6 satisfied + cov500_or_30s_crossed: bool + cov500_or_30s_crossed_at_ms: int | None + horiz_accuracy_999: bool # AC-7 part 1 + failsafe_statustext_offset_ms: int | None + failsafe_statustext_in_time: bool # AC-7 part 2 + passes_ac6: bool + passes_ac7: bool + + @property + def passes(self) -> bool: + return self.passes_ac6 and self.passes_ac7 + + +@dataclass(frozen=True) +class RecoveryGateReport: + """AC-8 result.""" + + recovery_at_ms: int | None + stable_period_s: float | None + consistency_check_passed: bool + passes: bool + + +@dataclass(frozen=True) +class BlackoutSpoofReport: + """Aggregate FT-N-04 result for one window.""" + + window: BlackoutWindow + switch_latency: SwitchLatencyReport + spoof_rejection: SpoofRejectionReport + covariance_monotonic: CovarianceMonotonicReport + honest_accuracy: HonestAccuracyReport + statustext_rate: StatustextRateReport + escalation: EscalationReport + recovery_gate: RecoveryGateReport + + @property + def passes(self) -> bool: + return all( + ( + self.switch_latency.passes, + self.spoof_rejection.passes, + self.covariance_monotonic.passes, + self.honest_accuracy.passes, + self.statustext_rate.passes, + self.escalation.passes, + self.recovery_gate.passes, + ) + ) + + +def _inside_window(window: BlackoutWindow, t_ms: int) -> bool: + return window.onset_monotonic_ms <= t_ms <= window.end_monotonic_ms + + +def _samples_inside_window( + window: BlackoutWindow, samples: Iterable[OutboundEstimateSample] +) -> list[OutboundEstimateSample]: + return [s for s in samples if _inside_window(window, s.monotonic_ms)] + + +def evaluate_switch_latency( + window: BlackoutWindow, + estimates: Sequence[OutboundEstimateSample], + frame_period_ms: int, +) -> SwitchLatencyReport: + """AC-1: dead_reckoned label within ≤1 frame OR ≤SWITCH_LATENCY_MS.""" + budget_ms = min(SWITCH_LATENCY_MS, frame_period_ms) + offset: int | None = None + for s in estimates: + if s.monotonic_ms < window.onset_monotonic_ms: + continue + if s.source_label == DEAD_RECKONED_LABEL: + offset = s.monotonic_ms - window.onset_monotonic_ms + break + return SwitchLatencyReport( + first_dead_reckoned_offset_ms=offset, + frame_period_ms=frame_period_ms, + passes=offset is not None and offset <= budget_ms, + ) + + +def evaluate_spoof_rejection( + window: BlackoutWindow, + estimates: Sequence[OutboundEstimateSample], + spoof_events: Sequence[SpoofRejectedEvent], +) -> SpoofRejectionReport: + """AC-2: spoof-rejected events present AND no satellite_anchored re-entry.""" + rejected = sum( + 1 for ev in spoof_events if _inside_window(window, ev.monotonic_ms) + ) + inside = _samples_inside_window(window, estimates) + re_anchored = sum(1 for s in inside if s.source_label == SATELLITE_ANCHORED_LABEL) + return SpoofRejectionReport( + spoof_rejected_count=rejected, + satellite_anchored_inside_window=re_anchored, + passes=rejected >= 1 and re_anchored == 0, + ) + + +def evaluate_covariance_monotonic( + window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample] +) -> CovarianceMonotonicReport: + """AC-3: cov_semi_major_m non-decreasing across consecutive emissions.""" + inside = _samples_inside_window(window, estimates) + first_dec: int | None = None + for i in range(1, len(inside)): + if inside[i].cov_semi_major_m < inside[i - 1].cov_semi_major_m: + first_dec = inside[i].monotonic_ms + break + return CovarianceMonotonicReport( + first_decreasing_at_ms=first_dec, + sample_count=len(inside), + passes=first_dec is None and len(inside) >= 1, + ) + + +def evaluate_honest_accuracy( + window: BlackoutWindow, estimates: Sequence[OutboundEstimateSample] +) -> HonestAccuracyReport: + """AC-4: horiz_accuracy ≥ HONEST_ACCURACY_RATIO × cov_semi_major_m.""" + inside = _samples_inside_window(window, estimates) + violations = sum( + 1 + for s in inside + if s.horiz_accuracy < HONEST_ACCURACY_RATIO * s.cov_semi_major_m + ) + return HonestAccuracyReport( + violation_count=violations, + sample_count=len(inside), + passes=violations == 0 and len(inside) >= 1, + ) + + +def evaluate_statustext_rate( + window: BlackoutWindow, statustexts: Sequence[StatustextSample] +) -> StatustextRateReport: + """AC-5: VISUAL_BLACKOUT_IMU_ONLY rate ∈ [1, 2] Hz.""" + inside = [ + st + for st in statustexts + if STATUSTEXT_IMU_ONLY in st.text and _inside_window(window, st.monotonic_ms) + ] + duration_s = window.duration_s + if duration_s <= 0 or not inside: + return StatustextRateReport(observed_hz=None, count=len(inside), passes=False) + rate = len(inside) / duration_s + return StatustextRateReport( + observed_hz=rate, + count=len(inside), + passes=STATUSTEXT_RATE_MIN_HZ <= rate <= STATUSTEXT_RATE_MAX_HZ, + ) + + +def _first_cov_crossing_ms( + window: BlackoutWindow, + estimates: Sequence[OutboundEstimateSample], + threshold_m: float, +) -> int | None: + for s in _samples_inside_window(window, estimates): + if s.cov_semi_major_m >= threshold_m: + return s.monotonic_ms + return None + + +def evaluate_escalation( + window: BlackoutWindow, + estimates: Sequence[OutboundEstimateSample], + statustexts: Sequence[StatustextSample], + *, + is_35s_window: bool, +) -> EscalationReport: + """AC-6 + AC-7: applies only to the 35 s sub-case. + + For non-35 s windows the report is vacuously passing — those windows + are not expected to cross either escalation threshold and any + incidental crossing is treated as informational only. + """ + cov2d_at = _first_cov_crossing_ms(window, estimates, ESCALATION_COV_2D_M) + cov500_at = _first_cov_crossing_ms(window, estimates, ESCALATION_COV_FAILSAFE_M) + duration_breach_at: int | None = None + if window.duration_s >= ESCALATION_DURATION_FAILSAFE_S: + duration_breach_at = ( + window.onset_monotonic_ms + + int(ESCALATION_DURATION_FAILSAFE_S * 1000) + ) + failsafe_trigger_at: int | None = None + if cov500_at is not None and duration_breach_at is not None: + failsafe_trigger_at = min(cov500_at, duration_breach_at) + else: + failsafe_trigger_at = cov500_at if cov500_at is not None else duration_breach_at + + if not is_35s_window: + return EscalationReport( + cov2d_crossed=cov2d_at is not None, + cov2d_crossed_at_ms=cov2d_at, + fix_type_degraded=True, + cov500_or_30s_crossed=failsafe_trigger_at is not None, + cov500_or_30s_crossed_at_ms=failsafe_trigger_at, + horiz_accuracy_999=True, + failsafe_statustext_offset_ms=None, + failsafe_statustext_in_time=True, + passes_ac6=True, + passes_ac7=True, + ) + + # AC-6: any sample at/after cov2d_at must have fix_type ≤ ESCALATION_FIX_TYPE_2D. + fix_degraded = True + if cov2d_at is not None: + post = [s for s in _samples_inside_window(window, estimates) if s.monotonic_ms >= cov2d_at] + if post and any(s.fix_type > ESCALATION_FIX_TYPE_2D for s in post): + fix_degraded = False + passes_ac6 = cov2d_at is None or fix_degraded + + # AC-7: post-trigger samples must have horiz_accuracy == 999 AND + # VISUAL_BLACKOUT_FAILSAFE STATUSTEXT must arrive within ≤500 ms of trigger. + horiz_999 = True + failsafe_offset: int | None = None + failsafe_in_time = True + if failsafe_trigger_at is not None: + post = [s for s in _samples_inside_window(window, estimates) if s.monotonic_ms >= failsafe_trigger_at] + if post and any(s.horiz_accuracy != HORIZ_ACCURACY_FAILSAFE for s in post): + horiz_999 = False + for st in statustexts: + if STATUSTEXT_FAILSAFE not in st.text: + continue + if st.monotonic_ms < failsafe_trigger_at: + continue + offset = st.monotonic_ms - failsafe_trigger_at + if failsafe_offset is None or offset < failsafe_offset: + failsafe_offset = offset + failsafe_in_time = ( + failsafe_offset is not None and failsafe_offset <= ESCALATION_LATENCY_MS + ) + passes_ac7 = failsafe_trigger_at is None or (horiz_999 and failsafe_in_time) + + return EscalationReport( + cov2d_crossed=cov2d_at is not None, + cov2d_crossed_at_ms=cov2d_at, + fix_type_degraded=fix_degraded, + cov500_or_30s_crossed=failsafe_trigger_at is not None, + cov500_or_30s_crossed_at_ms=failsafe_trigger_at, + horiz_accuracy_999=horiz_999, + failsafe_statustext_offset_ms=failsafe_offset, + failsafe_statustext_in_time=failsafe_in_time, + passes_ac6=passes_ac6, + passes_ac7=passes_ac7, + ) + + +def evaluate_recovery_gate( + window: BlackoutWindow, + estimates: Sequence[OutboundEstimateSample], + gps_health: Sequence[GpsHealthSample], + consistency_checks: Sequence[ConsistencyCheckEvent], +) -> RecoveryGateReport: + """AC-8: recovery only after ≥10 s healthy/non-spoofed FC GPS AND a consistency check pass.""" + # First post-window satellite_anchored sample marks the (claimed) recovery moment. + recovery_at: int | None = None + for s in estimates: + if ( + s.monotonic_ms > window.end_monotonic_ms + and s.source_label == SATELLITE_ANCHORED_LABEL + ): + recovery_at = s.monotonic_ms + break + if recovery_at is None: + # No recovery attempted — vacuously passing for this gate; the + # caller can still flag it via window-level coverage. + return RecoveryGateReport( + recovery_at_ms=None, + stable_period_s=None, + consistency_check_passed=False, + passes=True, + ) + + # (a) Continuous healthy/non-spoofed FC GPS for ≥RECOVERY_STABLE_S BEFORE recovery_at. + cutoff_ms = recovery_at - int(RECOVERY_STABLE_S * 1000) + relevant = [ + h for h in gps_health + if window.end_monotonic_ms <= h.monotonic_ms <= recovery_at + ] + stable = all(h.healthy and not h.spoofed for h in relevant) and len(relevant) >= 1 + earliest_relevant = relevant[0].monotonic_ms if relevant else recovery_at + stable_period_s = (recovery_at - earliest_relevant) / 1000.0 + has_enough_window = earliest_relevant <= cutoff_ms + + # (b) Consistency check pass occurred between window-end and recovery_at. + consistency_passed = any( + c.passed and window.end_monotonic_ms <= c.monotonic_ms <= recovery_at + for c in consistency_checks + ) + + return RecoveryGateReport( + recovery_at_ms=recovery_at, + stable_period_s=stable_period_s, + consistency_check_passed=consistency_passed, + passes=stable and has_enough_window and consistency_passed, + ) + + +def evaluate( + window: BlackoutWindow, + *, + estimates: Sequence[OutboundEstimateSample], + statustexts: Sequence[StatustextSample], + spoof_events: Sequence[SpoofRejectedEvent], + gps_health: Sequence[GpsHealthSample], + consistency_checks: Sequence[ConsistencyCheckEvent], + frame_period_ms: int, + is_35s_window: bool, +) -> BlackoutSpoofReport: + """Run every AC-1..AC-8 check for a single window.""" + return BlackoutSpoofReport( + window=window, + switch_latency=evaluate_switch_latency(window, estimates, frame_period_ms), + spoof_rejection=evaluate_spoof_rejection(window, estimates, spoof_events), + covariance_monotonic=evaluate_covariance_monotonic(window, estimates), + honest_accuracy=evaluate_honest_accuracy(window, estimates), + statustext_rate=evaluate_statustext_rate(window, statustexts), + escalation=evaluate_escalation( + window, estimates, statustexts, is_35s_window=is_35s_window + ), + recovery_gate=evaluate_recovery_gate( + window, estimates, gps_health, consistency_checks + ), + ) + + +def write_csv_evidence(out_path: Path, report: BlackoutSpoofReport) -> Path: + """Write FT-N-04 aggregate evidence — one row of per-AC summary.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "window_duration_s", + "ac1_switch_latency_ms", + "ac1_passes", + "ac2_spoof_rejected_count", + "ac2_re_anchored_count", + "ac2_passes", + "ac3_first_decreasing_at_ms", + "ac3_passes", + "ac4_violation_count", + "ac4_passes", + "ac5_observed_hz", + "ac5_passes", + "ac6_cov2d_at_ms", + "ac6_passes", + "ac7_failsafe_trigger_at_ms", + "ac7_passes", + "ac8_recovery_at_ms", + "ac8_passes", + "passes", + ] + ) + r = report + writer.writerow( + [ + f"{r.window.duration_s:.3f}", + "" if r.switch_latency.first_dead_reckoned_offset_ms is None else r.switch_latency.first_dead_reckoned_offset_ms, + "true" if r.switch_latency.passes else "false", + r.spoof_rejection.spoof_rejected_count, + r.spoof_rejection.satellite_anchored_inside_window, + "true" if r.spoof_rejection.passes else "false", + "" if r.covariance_monotonic.first_decreasing_at_ms is None else r.covariance_monotonic.first_decreasing_at_ms, + "true" if r.covariance_monotonic.passes else "false", + r.honest_accuracy.violation_count, + "true" if r.honest_accuracy.passes else "false", + "" if r.statustext_rate.observed_hz is None else f"{r.statustext_rate.observed_hz:.3f}", + "true" if r.statustext_rate.passes else "false", + "" if r.escalation.cov2d_crossed_at_ms is None else r.escalation.cov2d_crossed_at_ms, + "true" if r.escalation.passes_ac6 else "false", + "" if r.escalation.cov500_or_30s_crossed_at_ms is None else r.escalation.cov500_or_30s_crossed_at_ms, + "true" if r.escalation.passes_ac7 else "false", + "" if r.recovery_gate.recovery_at_ms is None else r.recovery_gate.recovery_at_ms, + "true" if r.recovery_gate.passes else "false", + "true" if r.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/outage_request_evaluator.py b/e2e/runner/helpers/outage_request_evaluator.py new file mode 100644 index 0000000..5a829a3 --- /dev/null +++ b/e2e/runner/helpers/outage_request_evaluator.py @@ -0,0 +1,293 @@ +"""Outage-request evaluation for FT-N-03 (AZ-425 / AC-3.4). + +Detects sustained no-estimate outage windows from an outbound-estimate +stream, then evaluates: + +* AC-1: outage onset — ≥``MIN_OUTAGE_FRAMES`` consecutive missing frames. +* AC-2: STATUSTEXT containing ``OPERATOR_RELOC_REQUEST`` is emitted + within ``[OUTAGE_THRESHOLD_S − TOLERANCE_S, OUTAGE_THRESHOLD_S + + TOLERANCE_S]`` of outage onset. +* AC-3: during the outage window, the outbound stream emits at least + one estimate carrying ``source_label = dead_reckoned`` (IMU-extrapolated + propagation continues). +* AC-4: FC-side SITL state shows NO EKF divergence event during the + outage. + +A "no-estimate frame" is a frame_idx in the expected sequence with no +matching outbound-estimate record. Frame indices are expected to be +monotonic; ``expected_frame_indices`` is supplied by the caller so the +evaluator does not have to know the replay's total frame count. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, Sequence + +MIN_OUTAGE_FRAMES = 3 # AC-1 +OUTAGE_THRESHOLD_S = 2.0 # AC-2 +TOLERANCE_S = 0.5 # AC-2 ±500 ms window +STATUSTEXT_REGEX = "OPERATOR_RELOC_REQUEST" # AC-2 exact substring +DEAD_RECKONED_LABEL = "dead_reckoned" # AC-3 + + +@dataclass(frozen=True) +class OutboundEstimateSample: + """One outbound estimate keyed by frame index + monotonic time.""" + + frame_idx: int + monotonic_ms: int + source_label: str + + +@dataclass(frozen=True) +class StatustextSample: + """One STATUSTEXT message captured from mavproxy tlog.""" + + monotonic_ms: int + text: str + + +@dataclass(frozen=True) +class EkfDivergenceEvent: + """One EKF-divergence event observed via SITL state read.""" + + monotonic_ms: int + reason: str + + +@dataclass(frozen=True) +class OutageWindow: + """One detected outage window — contiguous run of missing frames.""" + + first_missing_frame_idx: int + last_missing_frame_idx: int + onset_monotonic_ms: int + end_monotonic_ms: int + + @property + def length_frames(self) -> int: + return self.last_missing_frame_idx - self.first_missing_frame_idx + 1 + + @property + def duration_ms(self) -> int: + return self.end_monotonic_ms - self.onset_monotonic_ms + + +@dataclass(frozen=True) +class OutageReport: + """AC-1 / AC-2 / AC-3 / AC-4 evaluation for one outage window.""" + + window: OutageWindow + passes_min_length: bool # AC-1 + statustext_offset_ms: int | None # AC-2: ms after onset, None if absent + passes_statustext: bool # AC-2 + dead_reckoned_count: int # AC-3 supporting metric + passes_dead_reckoned: bool # AC-3 + ekf_divergence_count: int # AC-4 supporting metric + passes_ekf: bool # AC-4 + + @property + def passes(self) -> bool: + return ( + self.passes_min_length + and self.passes_statustext + and self.passes_dead_reckoned + and self.passes_ekf + ) + + +def detect_outage_windows( + expected_frame_indices: Sequence[int], + estimates: Sequence[OutboundEstimateSample], + frame_period_ms: int, + replay_start_monotonic_ms: int = 0, +) -> list[OutageWindow]: + """Detect contiguous outage windows. + + A frame index in ``expected_frame_indices`` with no matching estimate + counts as missing. Runs of consecutive missing frames of length + ≥``MIN_OUTAGE_FRAMES`` become outage windows. + + ``frame_period_ms`` is the nominal inter-frame interval; onset/end + timestamps are derived as + ``replay_start_monotonic_ms + frame_idx * frame_period_ms``. The + timing fields are estimates — when actual capture timestamps are + available the caller should pass them via ``estimates`` and rely on + those for downstream timing checks. + """ + present = {e.frame_idx for e in estimates} + windows: list[OutageWindow] = [] + run_start: int | None = None + prev_idx: int | None = None + for idx in expected_frame_indices: + if idx not in present: + if run_start is None: + run_start = idx + prev_idx = idx + else: + if run_start is not None and prev_idx is not None: + run_length = prev_idx - run_start + 1 + if run_length >= MIN_OUTAGE_FRAMES: + windows.append( + OutageWindow( + first_missing_frame_idx=run_start, + last_missing_frame_idx=prev_idx, + onset_monotonic_ms=replay_start_monotonic_ms + + run_start * frame_period_ms, + end_monotonic_ms=replay_start_monotonic_ms + + (prev_idx + 1) * frame_period_ms, + ) + ) + run_start = None + prev_idx = None + # Trailing run. + if run_start is not None and prev_idx is not None: + run_length = prev_idx - run_start + 1 + if run_length >= MIN_OUTAGE_FRAMES: + windows.append( + OutageWindow( + first_missing_frame_idx=run_start, + last_missing_frame_idx=prev_idx, + onset_monotonic_ms=replay_start_monotonic_ms + + run_start * frame_period_ms, + end_monotonic_ms=replay_start_monotonic_ms + + (prev_idx + 1) * frame_period_ms, + ) + ) + return windows + + +def _first_statustext_offset_ms( + window: OutageWindow, + statustexts: Iterable[StatustextSample], +) -> int | None: + """Return ms-offset of first OPERATOR_RELOC_REQUEST after onset, or None.""" + best: int | None = None + for st in statustexts: + if STATUSTEXT_REGEX not in st.text: + continue + if st.monotonic_ms < window.onset_monotonic_ms: + continue + offset = st.monotonic_ms - window.onset_monotonic_ms + if best is None or offset < best: + best = offset + return best + + +def _dead_reckoned_during_window( + window: OutageWindow, + estimates: Iterable[OutboundEstimateSample], +) -> int: + count = 0 + for e in estimates: + if ( + e.source_label == DEAD_RECKONED_LABEL + and window.onset_monotonic_ms <= e.monotonic_ms <= window.end_monotonic_ms + ): + count += 1 + return count + + +def _ekf_divergence_during_window( + window: OutageWindow, + events: Iterable[EkfDivergenceEvent], +) -> int: + count = 0 + for ev in events: + if window.onset_monotonic_ms <= ev.monotonic_ms <= window.end_monotonic_ms: + count += 1 + return count + + +def evaluate_window( + window: OutageWindow, + estimates: Sequence[OutboundEstimateSample], + statustexts: Sequence[StatustextSample], + ekf_events: Sequence[EkfDivergenceEvent], +) -> OutageReport: + """Compute AC-1..AC-4 evaluation for a single outage window.""" + offset = _first_statustext_offset_ms(window, statustexts) + threshold_ms = int(OUTAGE_THRESHOLD_S * 1000) + tolerance_ms = int(TOLERANCE_S * 1000) + passes_statustext = ( + offset is not None + and (threshold_ms - tolerance_ms) <= offset <= (threshold_ms + tolerance_ms) + ) + dr_count = _dead_reckoned_during_window(window, estimates) + ekf_count = _ekf_divergence_during_window(window, ekf_events) + return OutageReport( + window=window, + passes_min_length=window.length_frames >= MIN_OUTAGE_FRAMES, + statustext_offset_ms=offset, + passes_statustext=passes_statustext, + dead_reckoned_count=dr_count, + passes_dead_reckoned=dr_count >= 1, + ekf_divergence_count=ekf_count, + passes_ekf=ekf_count == 0, + ) + + +def evaluate( + expected_frame_indices: Sequence[int], + estimates: Sequence[OutboundEstimateSample], + statustexts: Sequence[StatustextSample], + ekf_events: Sequence[EkfDivergenceEvent], + frame_period_ms: int, + replay_start_monotonic_ms: int = 0, +) -> list[OutageReport]: + """Detect outage windows and evaluate each.""" + windows = detect_outage_windows( + expected_frame_indices, + estimates, + frame_period_ms=frame_period_ms, + replay_start_monotonic_ms=replay_start_monotonic_ms, + ) + return [evaluate_window(w, estimates, statustexts, ekf_events) for w in windows] + + +def write_csv_evidence(out_path: Path, reports: Sequence[OutageReport]) -> Path: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "first_missing_frame", + "last_missing_frame", + "length_frames", + "onset_ms", + "duration_ms", + "statustext_offset_ms", + "dead_reckoned_count", + "ekf_divergence_count", + "passes_min_length", + "passes_statustext", + "passes_dead_reckoned", + "passes_ekf", + "passes", + ] + ) + for r in reports: + writer.writerow( + [ + r.window.first_missing_frame_idx, + r.window.last_missing_frame_idx, + r.window.length_frames, + r.window.onset_monotonic_ms, + r.window.duration_ms, + "" if r.statustext_offset_ms is None else r.statustext_offset_ms, + r.dead_reckoned_count, + r.ekf_divergence_count, + "true" if r.passes_min_length else "false", + "true" if r.passes_statustext else "false", + "true" if r.passes_dead_reckoned else "false", + "true" if r.passes_ekf else "false", + "true" if r.passes else "false", + ] + ) + return out_path diff --git a/e2e/runner/helpers/outlier_tolerance_evaluator.py b/e2e/runner/helpers/outlier_tolerance_evaluator.py new file mode 100644 index 0000000..c90b7aa --- /dev/null +++ b/e2e/runner/helpers/outlier_tolerance_evaluator.py @@ -0,0 +1,261 @@ +"""Outlier-tolerance evaluation for FT-N-01 (AZ-424 / AC-3.1). + +Consumes the AZ-408 ``outlier`` injector's ``manifest.csv`` (which +frames were replaced + the geodesic offset) and the SUT's outbound +estimate stream, and validates: + +* AC-1: at least ``MIN_OUTLIER_COUNT`` outlier frames were injected + over the replay. +* AC-2: for every outlier event, + ``error_after_outlier ≤ error_before_outlier + DRIFT_BUDGET_M``. +* AC-3: ``cov_semi_major_m`` is non-decreasing across the 3-frame + window centred on the outlier (frame before, outlier, frame after). + +The injector's ``geodesic_offset_m`` column verifies the +RESTRICT-CAM-1 / AC-3.1 threshold (>350 m) per-row — the AC-1 count +check here is a coarser invariant that does not duplicate the +per-row geodesic gate. + +Public-boundary discipline: does NOT import any +``src/gps_denied_onboard`` symbol. +""" + +from __future__ import annotations + +import csv +from dataclasses import dataclass +from pathlib import Path +from typing import Sequence + +from .geo import distance_m + +DRIFT_BUDGET_M = 50.0 # AC-2 +COVARIANCE_WINDOW_FRAMES = 3 # AC-3: 1 before + 1 outlier + 1 after +MIN_OUTLIER_COUNT = 10 # AC-1: ~10 over Derkachi 8-min replay + + +@dataclass(frozen=True) +class GtPose: + """One ground-truth pose for a video frame, keyed by frame index.""" + + frame_idx: int + lat_deg: float + lon_deg: float + + +@dataclass(frozen=True) +class OutboundEstimate: + """One outbound estimate with covariance + label, keyed by frame index.""" + + frame_idx: int + monotonic_ms: int + lat_deg: float + lon_deg: float + cov_semi_major_m: float + source_label: str + + +@dataclass(frozen=True) +class OutlierEvent: + """One row from the injector's manifest.csv.""" + + frame_idx: int + geodesic_offset_m: float + src_jpeg_path: str + + +@dataclass(frozen=True) +class OutlierEventReport: + """AC-2 + AC-3 evaluation for one outlier event.""" + + frame_idx: int + error_before_m: float | None + error_outlier_m: float | None + error_after_m: float | None + drift_m: float | None # error_after - error_before; AC-2 budget + cov_before: float | None + cov_outlier: float | None + cov_after: float | None + cov_non_decreasing: bool + + @property + def passes_drift(self) -> bool: + return ( + self.drift_m is not None + and self.drift_m <= DRIFT_BUDGET_M + ) + + @property + def passes_covariance(self) -> bool: + return self.cov_non_decreasing + + @property + def passes(self) -> bool: + return self.passes_drift and self.passes_covariance + + +@dataclass(frozen=True) +class OutlierToleranceReport: + """Aggregate report for all outlier events in the replay.""" + + events: tuple[OutlierEventReport, ...] + total_outliers: int + + @property + def passes_count(self) -> bool: + return self.total_outliers >= MIN_OUTLIER_COUNT + + @property + def failed_event_count(self) -> int: + return sum(1 for e in self.events if not e.passes) + + @property + def passes(self) -> bool: + return self.passes_count and self.failed_event_count == 0 + + +def load_outlier_manifest(manifest_path: Path) -> list[OutlierEvent]: + """Read ``outlier/manifest.csv`` into typed events. + + Schema (AZ-408): ``frame_idx, src_jpeg_path, replacement_tile_x, + replacement_tile_y, geodesic_offset_m, seed``. + """ + if not manifest_path.exists(): + raise FileNotFoundError( + f"outlier manifest not found: {manifest_path} — run the " + "outlier injector first (AZ-408 / runner/helpers/injector_fixtures)" + ) + events: list[OutlierEvent] = [] + with manifest_path.open() as fh: + reader = csv.DictReader(fh) + required = {"frame_idx", "src_jpeg_path", "geodesic_offset_m"} + missing = required - set(reader.fieldnames or []) + if missing: + raise ValueError( + f"outlier manifest {manifest_path} missing required columns: " + f"{sorted(missing)}" + ) + for row in reader: + events.append( + OutlierEvent( + frame_idx=int(row["frame_idx"]), + geodesic_offset_m=float(row["geodesic_offset_m"]), + src_jpeg_path=row["src_jpeg_path"], + ) + ) + return events + + +def _index_by_frame(estimates: Sequence[OutboundEstimate]) -> dict[int, OutboundEstimate]: + by_frame: dict[int, OutboundEstimate] = {} + for e in estimates: + by_frame[e.frame_idx] = e + return by_frame + + +def _index_gt(gt: Sequence[GtPose]) -> dict[int, GtPose]: + by_frame: dict[int, GtPose] = {} + for g in gt: + by_frame[g.frame_idx] = g + return by_frame + + +def _error_m(est: OutboundEstimate | None, gt: GtPose | None) -> float | None: + if est is None or gt is None: + return None + return distance_m(gt.lat_deg, gt.lon_deg, est.lat_deg, est.lon_deg) + + +def evaluate_event( + event: OutlierEvent, + estimates_by_frame: dict[int, OutboundEstimate], + gt_by_frame: dict[int, GtPose], +) -> OutlierEventReport: + """Compute the AC-2 + AC-3 report for one outlier event.""" + before = estimates_by_frame.get(event.frame_idx - 1) + outlier = estimates_by_frame.get(event.frame_idx) + after = estimates_by_frame.get(event.frame_idx + 1) + + gt_before = gt_by_frame.get(event.frame_idx - 1) + gt_outlier = gt_by_frame.get(event.frame_idx) + gt_after = gt_by_frame.get(event.frame_idx + 1) + + err_before = _error_m(before, gt_before) + err_outlier = _error_m(outlier, gt_outlier) + err_after = _error_m(after, gt_after) + + drift: float | None = None + if err_before is not None and err_after is not None: + drift = err_after - err_before + + cov_before = before.cov_semi_major_m if before is not None else None + cov_outlier = outlier.cov_semi_major_m if outlier is not None else None + cov_after = after.cov_semi_major_m if after is not None else None + + covs = [c for c in (cov_before, cov_outlier, cov_after) if c is not None] + cov_non_decreasing = all(covs[i + 1] >= covs[i] for i in range(len(covs) - 1)) + + return OutlierEventReport( + frame_idx=event.frame_idx, + error_before_m=err_before, + error_outlier_m=err_outlier, + error_after_m=err_after, + drift_m=drift, + cov_before=cov_before, + cov_outlier=cov_outlier, + cov_after=cov_after, + cov_non_decreasing=cov_non_decreasing, + ) + + +def evaluate( + events: Sequence[OutlierEvent], + estimates: Sequence[OutboundEstimate], + gt: Sequence[GtPose], +) -> OutlierToleranceReport: + """Aggregate report across all outlier events.""" + by_frame = _index_by_frame(estimates) + gt_idx = _index_gt(gt) + reports = tuple(evaluate_event(ev, by_frame, gt_idx) for ev in events) + return OutlierToleranceReport(events=reports, total_outliers=len(events)) + + +def write_csv_evidence(out_path: Path, report: OutlierToleranceReport) -> Path: + """Write per-event FT-N-01 evidence CSV.""" + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="") as fh: + writer = csv.writer(fh) + writer.writerow( + [ + "frame_idx", + "error_before_m", + "error_outlier_m", + "error_after_m", + "drift_m", + "cov_before", + "cov_outlier", + "cov_after", + "cov_non_decreasing", + "passes_drift", + "passes_covariance", + "passes", + ] + ) + for e in report.events: + writer.writerow( + [ + e.frame_idx, + "" if e.error_before_m is None else f"{e.error_before_m:.3f}", + "" if e.error_outlier_m is None else f"{e.error_outlier_m:.3f}", + "" if e.error_after_m is None else f"{e.error_after_m:.3f}", + "" if e.drift_m is None else f"{e.drift_m:.3f}", + "" if e.cov_before is None else f"{e.cov_before:.3f}", + "" if e.cov_outlier is None else f"{e.cov_outlier:.3f}", + "" if e.cov_after is None else f"{e.cov_after:.3f}", + "true" if e.cov_non_decreasing else "false", + "true" if e.passes_drift else "false", + "true" if e.passes_covariance else "false", + "true" if e.passes else "false", + ] + ) + return out_path diff --git a/e2e/tests/negative/test_ft_n_01_outlier_tolerance.py b/e2e/tests/negative/test_ft_n_01_outlier_tolerance.py new file mode 100644 index 0000000..c729e12 --- /dev/null +++ b/e2e/tests/negative/test_ft_n_01_outlier_tolerance.py @@ -0,0 +1,170 @@ +"""FT-N-01 — 350 m outlier injection tolerance (AZ-424 / AC-3.1). + +Replays the Derkachi flight with the AZ-408 ``outlier`` injector at +``--density medium`` and verifies AC-1 / AC-2 / AC-3 via +``runner.helpers.outlier_tolerance_evaluator``. + +Gated on the same upstream replay helpers as FT-N-02 / FT-P-07 +(``frame_source_replay``, ``fdr_reader``, ``imu_replay``). When those +helpers are still stubbed (current state under AZ-441 / AZ-407 +leftovers), the scenario test skips while +``e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py`` covers +the pure-logic AC-2 / AC-3 invariants. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from fixtures.injectors.outlier import OutlierInjectionReport +from runner.helpers import outlier_tolerance_evaluator as ote + + +@pytest.fixture(scope="module") +def _harness_helpers_implemented() -> bool: + from runner.helpers import fdr_reader, imu_replay + from runner.helpers.frame_source_replay import FrameSourceReplayer + + try: + replayer = FrameSourceReplayer(sink=_NullSink()) # type: ignore[arg-type] + try: + replayer.replay_video(Path("/tmp/non-existent.mp4")) + except NotImplementedError: + return False + try: + list(fdr_reader.iter_records(Path("/tmp/non-existent"))) + except NotImplementedError: + return False + try: + imu_replay.ImuReplayer(emitter=_NullImuEmitter()).replay(Path("/tmp/non-existent.csv")) # type: ignore[arg-type] + except NotImplementedError: + return False + return True + except Exception: + return False + + +class _NullSink: + def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: + return None + + +class _NullImuEmitter: + def emit(self, sample: object) -> None: + return None + + +@pytest.mark.parametrize( + "outlier_injection_derkachi", + [{"density": "medium", "seed": 0}], + indirect=True, +) +@pytest.mark.traces_to("AC-3.1,AC-1,AC-2,AC-3,AC-4") +def test_ft_n_01_outlier_tolerance( + fc_adapter: str, + vio_strategy: str, + outlier_injection_derkachi: OutlierInjectionReport, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + _harness_helpers_implemented: bool, +) -> None: + if not _harness_helpers_implemented: + pytest.skip( + "FT-N-01 full replay requires runner.helpers.{frame_source_replay," + "fdr_reader,imu_replay} — currently AZ-441 / AZ-407 leftovers. " + "AC-1/AC-2/AC-3 helper logic covered by " + "e2e/_unit_tests/helpers/test_outlier_tolerance_evaluator.py." + ) + + from runner.helpers import fdr_reader + from runner.helpers.frame_source_replay import FrameSourceReplayer + + # 1. AC-1 — load injection plan (outlier event frames + offsets). + manifest_path = outlier_injection_derkachi.out_root / "manifest.csv" + events = ote.load_outlier_manifest(manifest_path) + assert len(events) >= ote.MIN_OUTLIER_COUNT, ( + f"AC-1: medium-density injection must produce ≥{ote.MIN_OUTLIER_COUNT} " + f"outliers (got {len(events)} from {manifest_path})" + ) + + # 2. Drive replay against the injected frames directory. + FrameSourceReplayer(_resolve_frame_sink()).replay_video( + outlier_injection_derkachi.out_root / "frames" + ) + + # 3. Collect outbound estimates + GT from FDR + tile cache. + fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" + estimates: list[ote.OutboundEstimate] = [] + for rec in fdr_reader.iter_records(fdr_root): + if rec.record_type != "outbound_estimate": + continue + payload = rec.payload + estimates.append( + ote.OutboundEstimate( + frame_idx=int(payload["frame_idx"]), # type: ignore[arg-type] + monotonic_ms=int(rec.monotonic_ms), + lat_deg=float(payload["lat_deg"]), # type: ignore[arg-type] + lon_deg=float(payload["lon_deg"]), # type: ignore[arg-type] + cov_semi_major_m=float(payload["cov_semi_major_m"]), # type: ignore[arg-type] + source_label=str(payload["source_label"]), # type: ignore[arg-type] + ) + ) + gt: list[ote.GtPose] = _resolve_gt_per_frame(outlier_injection_derkachi) + + if not estimates: + pytest.fail("FT-N-01: no outbound_estimate records produced") + + # 4. Evaluate per outlier event. + report = ote.evaluate(events, estimates, gt) + out_csv = evidence_dir / f"ft-n-01-{fc_adapter}-{vio_strategy}.csv" + ote.write_csv_evidence(out_csv, report) + + # 5. NFR + AC assertions. + nfr_recorder.record_metric( + "ft_n_01.total_outliers", float(report.total_outliers), ac_id="AC-1" + ) + nfr_recorder.record_metric( + "ft_n_01.failed_event_count", float(report.failed_event_count), ac_id="AC-2" + ) + for e in report.events: + if e.drift_m is not None: + nfr_recorder.record_metric( + f"ft_n_01.event_{e.frame_idx}.drift_m", e.drift_m, ac_id="AC-2" + ) + nfr_recorder.record_metric( + f"ft_n_01.event_{e.frame_idx}.cov_non_decreasing", + 1.0 if e.cov_non_decreasing else 0.0, + ac_id="AC-3", + ) + + assert report.passes_count, ( + f"AC-1: ≥{ote.MIN_OUTLIER_COUNT} outliers required; " + f"got {report.total_outliers}" + ) + for e in report.events: + assert e.passes_drift, ( + f"AC-2 (drift ≤ {ote.DRIFT_BUDGET_M} m) failed at frame " + f"{e.frame_idx}: drift_m={e.drift_m}, " + f"error_before={e.error_before_m}, error_after={e.error_after_m}" + ) + assert e.passes_covariance, ( + f"AC-3 (cov_semi_major_m non-decreasing across window) failed at " + f"frame {e.frame_idx}: " + f"cov_before={e.cov_before}, cov_outlier={e.cov_outlier}, " + f"cov_after={e.cov_after}" + ) + + +def _resolve_frame_sink(): # type: ignore[no-untyped-def] + raise NotImplementedError( + "frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay" + ) + + +def _resolve_gt_per_frame(report: OutlierInjectionReport) -> list[ote.GtPose]: + raise NotImplementedError( + "Per-frame GT resolution is owned by AZ-407 / runner.helpers.tile_cache_gt" + ) diff --git a/e2e/tests/negative/test_ft_n_03_outage_reloc.py b/e2e/tests/negative/test_ft_n_03_outage_reloc.py new file mode 100644 index 0000000..410f86f --- /dev/null +++ b/e2e/tests/negative/test_ft_n_03_outage_reloc.py @@ -0,0 +1,201 @@ +"""FT-N-03 — Extended outage triggers operator re-loc request (AZ-425 / AC-3.4). + +Replays the Derkachi flight with a 3-consecutive-frame failure injector +(a thin extension of the AZ-408 outlier injector that emits all-zero +frames instead of crops) and verifies AC-1..AC-4 via +``runner.helpers.outage_request_evaluator``. + +Gated on the same upstream replay helpers as FT-N-01 / FT-N-02 / FT-P-07 +(``frame_source_replay``, ``fdr_reader``, ``imu_replay``, mavproxy +``.tlog`` capture, SITL state read). When those helpers are still +stubbed, the scenario test skips while +``e2e/_unit_tests/helpers/test_outage_request_evaluator.py`` covers the +AC-1..AC-4 evaluator logic. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from runner.helpers import outage_request_evaluator as ore + +DERKACHI_DIR = ( + Path(__file__).resolve().parents[3] + / "_docs" + / "00_problem" + / "input_data" + / "flight_derkachi" +) +DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" + + +@pytest.fixture(scope="module") +def _harness_helpers_implemented() -> bool: + from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer + from runner.helpers.frame_source_replay import FrameSourceReplayer + + try: + replayer = FrameSourceReplayer(sink=_NullSink()) # type: ignore[arg-type] + try: + replayer.replay_video(Path("/tmp/non-existent.mp4")) + except NotImplementedError: + return False + try: + list(fdr_reader.iter_records(Path("/tmp/non-existent"))) + except NotImplementedError: + return False + try: + list(mavproxy_tlog_reader.iter_messages(Path("/tmp/non-existent.tlog"))) + except NotImplementedError: + return False + try: + sitl_observer.read_ekf_divergence_events() # type: ignore[attr-defined] + except (AttributeError, NotImplementedError): + return False + return True + except Exception: + return False + + +class _NullSink: + def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: + return None + + +@pytest.mark.traces_to("AC-3.4,AC-1,AC-2,AC-3,AC-4,AC-5") +def test_ft_n_03_outage_reloc( + fc_adapter: str, + vio_strategy: str, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + _harness_helpers_implemented: bool, +) -> None: + if not _harness_helpers_implemented: + pytest.skip( + "FT-N-03 full replay requires runner.helpers.{frame_source_replay," + "fdr_reader,mavproxy_tlog_reader,sitl_observer} — currently " + "AZ-441 / AZ-407 / AZ-416 leftovers. AC-1..AC-4 evaluator logic " + "covered by e2e/_unit_tests/helpers/test_outage_request_evaluator.py." + ) + + from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer + from runner.helpers.frame_source_replay import FrameSourceReplayer + + # 1. Build / locate the 3-frame outage injection fixture. + injected_frames_dir = _resolve_outage_injection_frames() + + # 2. Drive replay. + FrameSourceReplayer(_resolve_frame_sink()).replay_video(injected_frames_dir) + + # 3. Collect outbound estimates + STATUSTEXT + EKF events. + fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" + estimates: list[ore.OutboundEstimateSample] = [] + expected_frame_indices: list[int] = [] + for rec in fdr_reader.iter_records(fdr_root): + if rec.record_type == "frame_received": + expected_frame_indices.append(int(rec.payload["frame_idx"])) # type: ignore[arg-type] + elif rec.record_type == "outbound_estimate": + payload = rec.payload + estimates.append( + ore.OutboundEstimateSample( + frame_idx=int(payload["frame_idx"]), # type: ignore[arg-type] + monotonic_ms=int(rec.monotonic_ms), + source_label=str(payload["source_label"]), # type: ignore[arg-type] + ) + ) + + tlog_path = Path(evidence_dir).parent / f"run-{run_id}" / "mavproxy.tlog" + statustexts = [ + ore.StatustextSample( + monotonic_ms=int(m.timestamp_us // 1000), + text=str(m.fields.get("text", "")), + ) + for m in mavproxy_tlog_reader.iter_messages(tlog_path) + if m.msg_type == "STATUSTEXT" + ] + ekf_events = [ + ore.EkfDivergenceEvent( + monotonic_ms=int(ev.monotonic_ms), reason=str(ev.reason) + ) + for ev in sitl_observer.read_ekf_divergence_events() # type: ignore[attr-defined] + ] + + # 4. Evaluate. + reports = ore.evaluate( + expected_frame_indices, + estimates, + statustexts, + ekf_events, + frame_period_ms=_resolve_frame_period_ms(), + ) + out_csv = evidence_dir / f"ft-n-03-{fc_adapter}-{vio_strategy}.csv" + ore.write_csv_evidence(out_csv, reports) + + # 5. NFR metrics + AC assertions. + assert reports, "FT-N-03: at least one outage window must be detected (AC-1)" + for r in reports: + nfr_recorder.record_metric( + f"ft_n_03.window_{r.window.first_missing_frame_idx}.length_frames", + float(r.window.length_frames), + ac_id="AC-1", + ) + if r.statustext_offset_ms is not None: + nfr_recorder.record_metric( + f"ft_n_03.window_{r.window.first_missing_frame_idx}.statustext_offset_ms", + float(r.statustext_offset_ms), + ac_id="AC-2", + ) + nfr_recorder.record_metric( + f"ft_n_03.window_{r.window.first_missing_frame_idx}.dead_reckoned_count", + float(r.dead_reckoned_count), + ac_id="AC-3", + ) + nfr_recorder.record_metric( + f"ft_n_03.window_{r.window.first_missing_frame_idx}.ekf_divergence_count", + float(r.ekf_divergence_count), + ac_id="AC-4", + ) + + for r in reports: + assert r.passes_min_length, ( + f"AC-1: outage window {r.window.first_missing_frame_idx}-" + f"{r.window.last_missing_frame_idx} is shorter than " + f"{ore.MIN_OUTAGE_FRAMES} frames" + ) + assert r.passes_statustext, ( + f"AC-2: '{ore.STATUSTEXT_REGEX}' STATUSTEXT not within " + f"{int(ore.OUTAGE_THRESHOLD_S * 1000)} ±{int(ore.TOLERANCE_S * 1000)} ms " + f"of outage onset at frame {r.window.first_missing_frame_idx} " + f"(observed offset={r.statustext_offset_ms} ms)" + ) + assert r.passes_dead_reckoned, ( + f"AC-3: no `dead_reckoned` estimate emitted during outage " + f"window starting at frame {r.window.first_missing_frame_idx}" + ) + assert r.passes_ekf, ( + f"AC-4: EKF divergence event(s) observed during outage " + f"window starting at frame {r.window.first_missing_frame_idx} " + f"(count={r.ekf_divergence_count})" + ) + + +def _resolve_outage_injection_frames() -> Path: + raise NotImplementedError( + "3-frame outage injector is owned by AZ-408 extension / " + "fixtures/injectors/outlier.py (--all-zero variant)" + ) + + +def _resolve_frame_sink(): # type: ignore[no-untyped-def] + raise NotImplementedError( + "frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay" + ) + + +def _resolve_frame_period_ms() -> int: + raise NotImplementedError( + "Frame period resolution is owned by AZ-441 / runner.helpers.frame_source_replay" + ) diff --git a/e2e/tests/negative/test_ft_n_04_blackout_spoof.py b/e2e/tests/negative/test_ft_n_04_blackout_spoof.py new file mode 100644 index 0000000..01512d3 --- /dev/null +++ b/e2e/tests/negative/test_ft_n_04_blackout_spoof.py @@ -0,0 +1,267 @@ +"""FT-N-04 — Visual blackout + spoofed GPS combined failsafe (AZ-426 / AC-3.5, AC-NEW-8). + +Three sub-cases (5 s / 15 s / 35 s) at the ladder of windows +prescribed by AC-3.5 + AC-NEW-8, replayed via the AZ-408 +``blackout_spoof`` injector + the FC-inbound spoof proxy, and +validated by ``runner.helpers.blackout_spoof_evaluator``. + +Gated on the same upstream replay helpers as the other negative +scenarios (``frame_source_replay``, ``fdr_reader``, +``mavproxy_tlog_reader``, ``sitl_observer``, ``fc_proxy`` runtime +binding). When those helpers are still stubbed the scenario test +skips while +``e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py`` covers +the AC-1..AC-8 evaluator logic. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from fixtures.injectors.blackout_spoof import BlackoutSpoofReport +from runner.helpers import blackout_spoof_evaluator as bse + +_WINDOW_LADDER_S = (5.0, 15.0, 35.0) + + +@pytest.fixture(scope="module") +def _harness_helpers_implemented() -> bool: + from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer + from runner.helpers.frame_source_replay import FrameSourceReplayer + + try: + replayer = FrameSourceReplayer(sink=_NullSink()) # type: ignore[arg-type] + try: + replayer.replay_video(Path("/tmp/non-existent.mp4")) + except NotImplementedError: + return False + try: + list(fdr_reader.iter_records(Path("/tmp/non-existent"))) + except NotImplementedError: + return False + try: + list(mavproxy_tlog_reader.iter_messages(Path("/tmp/non-existent.tlog"))) + except NotImplementedError: + return False + try: + sitl_observer.read_gps_health_samples() # type: ignore[attr-defined] + sitl_observer.read_consistency_check_events() # type: ignore[attr-defined] + except (AttributeError, NotImplementedError): + return False + return True + except Exception: + return False + + +class _NullSink: + def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: + return None + + +@pytest.mark.parametrize( + "blackout_spoof_derkachi", + [{"window_seconds": s, "seed": 0} for s in _WINDOW_LADDER_S], + indirect=True, + ids=[f"{int(s)}s" for s in _WINDOW_LADDER_S], +) +@pytest.mark.traces_to( + "AC-3.5,AC-NEW-8,AC-1,AC-2,AC-3,AC-4,AC-5,AC-6,AC-7,AC-8,AC-9" +) +def test_ft_n_04_blackout_spoof( + fc_adapter: str, + vio_strategy: str, + blackout_spoof_derkachi: BlackoutSpoofReport, + evidence_dir, # type: ignore[no-untyped-def] + run_id: str, + nfr_recorder, # type: ignore[no-untyped-def] + _harness_helpers_implemented: bool, +) -> None: + if not _harness_helpers_implemented: + pytest.skip( + "FT-N-04 full replay requires runner.helpers.{frame_source_replay," + "fdr_reader,mavproxy_tlog_reader,sitl_observer,fc_proxy} — currently " + "AZ-441 / AZ-407 / AZ-416 leftovers. AC-1..AC-8 evaluator logic " + "covered by e2e/_unit_tests/helpers/test_blackout_spoof_evaluator.py." + ) + + from runner.helpers import fdr_reader, mavproxy_tlog_reader, sitl_observer + from runner.helpers.frame_source_replay import FrameSourceReplayer + + schedule = blackout_spoof_derkachi.schedule + window = bse.BlackoutWindow( + onset_monotonic_ms=schedule.window_start_ms, + end_monotonic_ms=schedule.window_end_ms, + ) + is_35s = abs(window.duration_s - 35.0) < 0.5 + + # 1. Drive replay (frames + paired fc-proxy spoof injection). + FrameSourceReplayer(_resolve_frame_sink()).replay_video( + blackout_spoof_derkachi.out_root / "frames" + ) + _drive_fc_proxy(blackout_spoof_derkachi.out_root / "schedule.json") + + # 2. Collect FDR estimates + spoof-rejected events. + fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" + estimates: list[bse.OutboundEstimateSample] = [] + spoof_events: list[bse.SpoofRejectedEvent] = [] + for rec in fdr_reader.iter_records(fdr_root): + if rec.record_type == "outbound_estimate": + p = rec.payload + estimates.append( + bse.OutboundEstimateSample( + monotonic_ms=int(rec.monotonic_ms), + source_label=str(p["source_label"]), # type: ignore[arg-type] + cov_semi_major_m=float(p["cov_semi_major_m"]), # type: ignore[arg-type] + horiz_accuracy=float(p.get("horiz_accuracy", p["cov_semi_major_m"])), # type: ignore[arg-type] + fix_type=int(p.get("fix_type", -1)), # type: ignore[arg-type] + ) + ) + elif rec.record_type == "spoof_rejected": + spoof_events.append( + bse.SpoofRejectedEvent( + monotonic_ms=int(rec.monotonic_ms), + reason=str(rec.payload.get("reason", "")), # type: ignore[arg-type] + ) + ) + + # 3. Collect STATUSTEXTs from mavproxy tlog. + tlog_path = Path(evidence_dir).parent / f"run-{run_id}" / "mavproxy.tlog" + statustexts = [ + bse.StatustextSample( + monotonic_ms=int(m.timestamp_us // 1000), + text=str(m.fields.get("text", "")), + ) + for m in mavproxy_tlog_reader.iter_messages(tlog_path) + if m.msg_type == "STATUSTEXT" + ] + + # 4. Collect FC-side GPS health + consistency-check events (recovery gate). + gps_health = [ + bse.GpsHealthSample( + monotonic_ms=int(s.monotonic_ms), + healthy=bool(s.healthy), + spoofed=bool(s.spoofed), + ) + for s in sitl_observer.read_gps_health_samples() # type: ignore[attr-defined] + ] + consistency = [ + bse.ConsistencyCheckEvent( + monotonic_ms=int(c.monotonic_ms), passed=bool(c.passed) + ) + for c in sitl_observer.read_consistency_check_events() # type: ignore[attr-defined] + ] + + # 5. Evaluate. + report = bse.evaluate( + window, + estimates=estimates, + statustexts=statustexts, + spoof_events=spoof_events, + gps_health=gps_health, + consistency_checks=consistency, + frame_period_ms=_resolve_frame_period_ms(), + is_35s_window=is_35s, + ) + out_csv = ( + evidence_dir + / f"ft-n-04-{int(window.duration_s)}s-{fc_adapter}-{vio_strategy}.csv" + ) + bse.write_csv_evidence(out_csv, report) + + # 6. NFR metrics + AC assertions. + nfr_recorder.record_metric( + f"ft_n_04.{int(window.duration_s)}s.switch_latency_ms", + float(report.switch_latency.first_dead_reckoned_offset_ms or 0), + ac_id="AC-1", + ) + nfr_recorder.record_metric( + f"ft_n_04.{int(window.duration_s)}s.spoof_rejected_count", + float(report.spoof_rejection.spoof_rejected_count), + ac_id="AC-2", + ) + nfr_recorder.record_metric( + f"ft_n_04.{int(window.duration_s)}s.honest_accuracy_violation_count", + float(report.honest_accuracy.violation_count), + ac_id="AC-4", + ) + if report.statustext_rate.observed_hz is not None: + nfr_recorder.record_metric( + f"ft_n_04.{int(window.duration_s)}s.statustext_imu_only_hz", + report.statustext_rate.observed_hz, + ac_id="AC-5", + ) + if is_35s: + nfr_recorder.record_metric( + "ft_n_04.35s.cov2d_at_ms", + float(report.escalation.cov2d_crossed_at_ms or 0), + ac_id="AC-6", + ) + nfr_recorder.record_metric( + "ft_n_04.35s.failsafe_trigger_at_ms", + float(report.escalation.cov500_or_30s_crossed_at_ms or 0), + ac_id="AC-7", + ) + + assert report.switch_latency.passes, ( + f"AC-1: dead_reckoned label not within ≤{bse.SWITCH_LATENCY_MS} ms / " + f"1 frame of blackout onset; " + f"offset={report.switch_latency.first_dead_reckoned_offset_ms} ms, " + f"frame_period={report.switch_latency.frame_period_ms} ms" + ) + assert report.spoof_rejection.passes, ( + f"AC-2: spoof rejection failed; " + f"rejected_count={report.spoof_rejection.spoof_rejected_count}, " + f"re_anchored_count={report.spoof_rejection.satellite_anchored_inside_window}" + ) + assert report.covariance_monotonic.passes, ( + f"AC-3: cov_semi_major_m decreased at " + f"{report.covariance_monotonic.first_decreasing_at_ms} ms" + ) + assert report.honest_accuracy.passes, ( + f"AC-4: horiz_accuracy under-reporting " + f"({report.honest_accuracy.violation_count} violations of " + f"{report.honest_accuracy.sample_count} samples)" + ) + assert report.statustext_rate.passes, ( + f"AC-5: VISUAL_BLACKOUT_IMU_ONLY rate " + f"{report.statustext_rate.observed_hz} Hz outside " + f"[{bse.STATUSTEXT_RATE_MIN_HZ}, {bse.STATUSTEXT_RATE_MAX_HZ}] Hz" + ) + if is_35s: + assert report.escalation.passes_ac6, ( + f"AC-6: fix_type not degraded after cov crossed " + f"{bse.ESCALATION_COV_2D_M} m at " + f"{report.escalation.cov2d_crossed_at_ms} ms" + ) + assert report.escalation.passes_ac7, ( + f"AC-7: failsafe escalation incomplete; " + f"horiz_999={report.escalation.horiz_accuracy_999}, " + f"failsafe_statustext_offset_ms=" + f"{report.escalation.failsafe_statustext_offset_ms}" + ) + assert report.recovery_gate.passes, ( + f"AC-8: recovery gate failed; " + f"recovery_at_ms={report.recovery_gate.recovery_at_ms}, " + f"stable_period_s={report.recovery_gate.stable_period_s}, " + f"consistency_check_passed={report.recovery_gate.consistency_check_passed}" + ) + + +def _resolve_frame_sink(): # type: ignore[no-untyped-def] + raise NotImplementedError( + "frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay" + ) + + +def _drive_fc_proxy(schedule_path: Path) -> None: + raise NotImplementedError( + "FC-inbound spoof proxy driver is owned by AZ-441 / runner.helpers.fc_proxy_runtime" + ) + + +def _resolve_frame_period_ms() -> int: + raise NotImplementedError( + "Frame period resolution is owned by AZ-441 / runner.helpers.frame_source_replay" + )