"""MAVLink 2.0 signing-rejection evaluator for NFT-SEC-03 (AZ-438 / AC-NEW-11, D-C8-9). For each of the three injection sub-cases — unsigned, signed-with-wrong-key, replayed-from-tlog — AP MUST: * emit a ``BAD_SIGNATURE`` STATUSTEXT within ≤``REJECTION_LATENCY_MS`` (500 ms) of the injected message; * NOT update its ``GLOBAL_POSITION_INT`` from the injected message (i.e. the GPS position remains anchored to whatever the last legitimate emission established). The "rejection STATUSTEXT" regex matches the canonical AP wording (e.g. ``MAVLink: BAD_SIGNATURE``) plus an "equivalent" wildcard that the spec carves out for AP variants that emit a slightly different phrase ("Bad signature received", "signature rejected", etc.) so this evaluator does not lock to one exact build. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations import csv import re from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Sequence REJECTION_LATENCY_MS = 500 POSITION_DRIFT_TOLERANCE_M = 1.0 class SubCase(str, Enum): UNSIGNED = "unsigned" WRONG_KEY = "wrong_key" REPLAYED = "replayed" # Canonical wording + variants observed across AP builds. Matched # case-insensitively. Extending this set is a deliberate decision; the # regression risk is that a future AP build emits a brand-new phrase # and the runner silently treats injections as accepted — guard against # that by surfacing the seen STATUSTEXTs in the CSV evidence row. BAD_SIGNATURE_PATTERNS: tuple[str, ...] = ( r"\bBAD[_\s]?SIGNATURE\b", r"\bsignature\s+rejected\b", r"\bbad\s+signature\s+received\b", ) _BAD_SIGNATURE_RE = re.compile("|".join(BAD_SIGNATURE_PATTERNS), re.IGNORECASE) def is_bad_signature_statustext(text: str) -> bool: """True iff the STATUSTEXT line matches one of the documented rejections.""" return bool(_BAD_SIGNATURE_RE.search(text)) @dataclass(frozen=True) class InjectionEvent: """One runner-issued injection in a sub-case.""" sub_case: SubCase injected_at_ms: int @dataclass(frozen=True) class StatustextSample: monotonic_ms: int text: str @dataclass(frozen=True) class PositionSample: """AP ``GLOBAL_POSITION_INT`` sample, decoded to meters.""" monotonic_ms: int lat_e7: int lon_e7: int def position_drift_m(samples: Sequence[PositionSample], around_ms: int) -> float: """Equirectangular drift (m) between the last sample before and first after ``around_ms``. A tiny budget (1 m by default) tolerates the per-frame jitter the autopilot's own EKF produces; the absolute test is that the drift is NOT on the order of the injected message's lat/lon magnitude (which would be several-meters to kilometers). """ before: PositionSample | None = None after: PositionSample | None = None for s in samples: if s.monotonic_ms <= around_ms: before = s if before is None or s.monotonic_ms > before.monotonic_ms else before elif after is None: after = s break if before is None or after is None: return 0.0 dlat_m = (after.lat_e7 - before.lat_e7) * 1e-7 * 111_320.0 avg_lat_rad = ((after.lat_e7 + before.lat_e7) / 2.0) * 1e-7 * (3.14159265358979 / 180.0) import math dlon_m = (after.lon_e7 - before.lon_e7) * 1e-7 * 111_320.0 * math.cos(avg_lat_rad) return math.hypot(dlat_m, dlon_m) @dataclass(frozen=True) class SubCaseRejectionReport: """One sub-case verdict (AC-2 / AC-3 / AC-4).""" sub_case: SubCase rejection_at_ms: int | None rejection_text: str | None rejection_latency_ms: int | None position_drift_m: float budget_ms: int = REJECTION_LATENCY_MS @property def passes_rejection(self) -> bool: return ( self.rejection_at_ms is not None and self.rejection_latency_ms is not None and self.rejection_latency_ms <= self.budget_ms ) @property def passes_no_position_update(self) -> bool: return self.position_drift_m <= POSITION_DRIFT_TOLERANCE_M @property def passes(self) -> bool: return self.passes_rejection and self.passes_no_position_update @dataclass(frozen=True) class SigningRejectionReport: """Aggregate AC-2 + AC-3 + AC-4 verdict across all sub-cases.""" sub_cases: Sequence[SubCaseRejectionReport] @property def passes(self) -> bool: return all(sc.passes for sc in self.sub_cases) def evaluate_subcase( injection: InjectionEvent, statustexts: Sequence[StatustextSample], positions: Sequence[PositionSample], ) -> SubCaseRejectionReport: """Compute verdict for one (injection, capture) pair.""" rejection_at: int | None = None rejection_text: str | None = None rejection_latency: int | None = None for st in statustexts: if st.monotonic_ms < injection.injected_at_ms: continue if is_bad_signature_statustext(st.text): rejection_at = st.monotonic_ms rejection_text = st.text rejection_latency = st.monotonic_ms - injection.injected_at_ms break drift = position_drift_m(positions, injection.injected_at_ms) return SubCaseRejectionReport( sub_case=injection.sub_case, rejection_at_ms=rejection_at, rejection_text=rejection_text, rejection_latency_ms=rejection_latency, position_drift_m=drift, ) def evaluate( injections: Sequence[InjectionEvent], *, statustexts: Sequence[StatustextSample], positions: Sequence[PositionSample], ) -> SigningRejectionReport: sub_reports: list[SubCaseRejectionReport] = [] for inj in injections: sub_reports.append( evaluate_subcase(inj, statustexts=statustexts, positions=positions) ) return SigningRejectionReport(sub_cases=tuple(sub_reports)) def write_csv_evidence(out_path: Path, report: SigningRejectionReport) -> Path: out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", newline="") as fh: writer = csv.writer(fh) writer.writerow( [ "sub_case", "rejection_at_ms", "rejection_latency_ms", "rejection_text", "position_drift_m", "passes_rejection", "passes_no_position_update", "passes", ] ) for sc in report.sub_cases: writer.writerow( [ sc.sub_case.value, "" if sc.rejection_at_ms is None else sc.rejection_at_ms, "" if sc.rejection_latency_ms is None else sc.rejection_latency_ms, sc.rejection_text or "", f"{sc.position_drift_m:.4f}", "true" if sc.passes_rejection else "false", "true" if sc.passes_no_position_update else "false", "true" if sc.passes else "false", ] ) return out_path