"""ArduPilot contract + signing-handshake evaluation for FT-P-09-AP (AZ-416). Given the captured ``.tlog`` from ``mavproxy-listener`` plus a single EK3_SRC1_POSXY parameter read, this helper validates: * AC-1: signing handshake completes within ≤5 s (``observe_signing_handshake`` — first signed message within the window OR absence of ``BAD_SIGNATURE`` STATUSTEXT during it). * AC-2: GPS_INPUT flow at ≥4.5 Hz over the 60 s replay (``compute_gps_input_rate``). * AC-3: EK3_SRC1_POSXY == 3 (``validate_ek3_src1_posxy`` — pure check on the param value the caller fetched via mavproxy). * AC-4: GPS_RAW_INT health — ``fix_type ≥ 3`` AND ``eph ≤ 200`` (HDOP ≤ 2.0) for ≥80 % of the 60 s window (``evaluate_gps_raw_int_health``). All inputs are pure ``Iterable[TlogMessage]``; the tlog ingestion is delegated to ``runner.helpers.mavproxy_tlog_reader.iter_messages``. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Iterable, Sequence from .mavproxy_tlog_reader import TlogMessage HANDSHAKE_BUDGET_S = 5.0 GPS_INPUT_TARGET_RATE_HZ = 5.0 GPS_INPUT_MIN_RATE_HZ = 4.5 GPS_RAW_INT_MIN_FIX_TYPE = 3 GPS_RAW_INT_MAX_EPH = 200 # HDOP × 100 ≤ 200 → HDOP ≤ 2.0 GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED = 0.80 EK3_SRC1_POSXY_REQUIRED = 3 # AP EKF source-set: 3 = GPS @dataclass(frozen=True) class HandshakeReport: """AC-1: signing-handshake completion observation.""" window_start_us: int window_end_us: int first_signed_us: int | None bad_signature_count: int setup_signing_seen: bool @property def lag_s(self) -> float | None: if self.first_signed_us is None: return None return (self.first_signed_us - self.window_start_us) / 1_000_000.0 @property def passes(self) -> bool: return ( self.first_signed_us is not None and self.lag_s is not None and self.lag_s <= HANDSHAKE_BUDGET_S and self.bad_signature_count == 0 ) @dataclass(frozen=True) class GpsInputRateReport: """AC-2: GPS_INPUT rate over the replay window.""" frame_count: int window_us: int observed_rate_hz: float target_rate_hz: float = GPS_INPUT_TARGET_RATE_HZ min_required_hz: float = GPS_INPUT_MIN_RATE_HZ @property def passes(self) -> bool: return ( self.window_us > 0 and self.observed_rate_hz >= self.min_required_hz ) @dataclass(frozen=True) class GpsRawIntHealthReport: """AC-4: GPS_RAW_INT fix_type + eph healthy fraction.""" total_samples: int healthy_samples: int fraction_required: float = GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED @property def healthy_fraction(self) -> float: if self.total_samples == 0: return 0.0 return self.healthy_samples / self.total_samples @property def passes(self) -> bool: return ( self.total_samples > 0 and self.healthy_fraction >= self.fraction_required ) def observe_signing_handshake( messages: Iterable[TlogMessage], *, handshake_window_us: int = int(HANDSHAKE_BUDGET_S * 1_000_000), ) -> HandshakeReport: """AC-1: first signed message within ``handshake_window_us``. The handshake window starts at the FIRST observed message's timestamp (the SUT cannot be heard from before that). The result PASSES if a signed message arrives within the window AND no ``STATUSTEXT`` with ``BAD_SIGNATURE`` is observed during it. The SETUP_SIGNING handshake exchange itself is unsigned by spec (it's how the key is shared), so its presence is reported but does NOT gate the pass — the gate is the first SIGNED follow-up. """ if handshake_window_us <= 0: raise ValueError(f"handshake_window_us must be > 0, got {handshake_window_us}") window_start: int | None = None window_end: int | None = None first_signed_us: int | None = None bad_sig_count = 0 setup_signing_seen = False for m in messages: if window_start is None: window_start = m.timestamp_us window_end = window_start + handshake_window_us if window_end is not None and m.timestamp_us > window_end: break if m.msg_type == "SETUP_SIGNING": setup_signing_seen = True if m.signed and first_signed_us is None: first_signed_us = m.timestamp_us if m.msg_type == "STATUSTEXT": text = str(m.fields.get("text", "")).upper() if "BAD_SIGNATURE" in text: bad_sig_count += 1 return HandshakeReport( window_start_us=window_start or 0, window_end_us=window_end or 0, first_signed_us=first_signed_us, bad_signature_count=bad_sig_count, setup_signing_seen=setup_signing_seen, ) def compute_gps_input_rate( messages: Iterable[TlogMessage], *, target_rate_hz: float = GPS_INPUT_TARGET_RATE_HZ, min_required_hz: float = GPS_INPUT_MIN_RATE_HZ, ) -> GpsInputRateReport: """AC-2: GPS_INPUT cadence over the entire message stream.""" if min_required_hz < 0: raise ValueError(f"min_required_hz must be ≥0, got {min_required_hz}") timestamps = [m.timestamp_us for m in messages if m.msg_type == "GPS_INPUT"] if len(timestamps) < 2: return GpsInputRateReport( frame_count=len(timestamps), window_us=0, observed_rate_hz=0.0, target_rate_hz=target_rate_hz, min_required_hz=min_required_hz, ) window_us = timestamps[-1] - timestamps[0] if window_us <= 0: return GpsInputRateReport( frame_count=len(timestamps), window_us=window_us, observed_rate_hz=0.0, target_rate_hz=target_rate_hz, min_required_hz=min_required_hz, ) observed = (len(timestamps) - 1) / (window_us / 1_000_000.0) return GpsInputRateReport( frame_count=len(timestamps), window_us=window_us, observed_rate_hz=observed, target_rate_hz=target_rate_hz, min_required_hz=min_required_hz, ) def validate_ek3_src1_posxy(value: int) -> bool: """AC-3: EK3_SRC1_POSXY must equal 3 (GPS source).""" return value == EK3_SRC1_POSXY_REQUIRED def evaluate_gps_raw_int_health( messages: Iterable[TlogMessage], *, min_fix_type: int = GPS_RAW_INT_MIN_FIX_TYPE, max_eph: int = GPS_RAW_INT_MAX_EPH, fraction_required: float = GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED, ) -> GpsRawIntHealthReport: """AC-4: ≥``fraction_required`` of GPS_RAW_INT samples must be healthy. A sample is "healthy" iff ``fix_type ≥ min_fix_type`` AND ``eph ≤ max_eph``. Both must hold per the spec text. """ if not 0.0 <= fraction_required <= 1.0: raise ValueError( f"fraction_required must be in [0, 1], got {fraction_required}" ) total = 0 healthy = 0 for m in messages: if m.msg_type != "GPS_RAW_INT": continue total += 1 try: fix_type = int(m.fields["fix_type"]) # type: ignore[arg-type] eph = int(m.fields["eph"]) # type: ignore[arg-type] except (KeyError, TypeError, ValueError): continue if fix_type >= min_fix_type and eph <= max_eph: healthy += 1 return GpsRawIntHealthReport( total_samples=total, healthy_samples=healthy, fraction_required=fraction_required, ) def collect_messages_to_list(messages: Iterable[TlogMessage]) -> list[TlogMessage]: """Materialise an iterator into a list — convenience for multi-pass eval. The scenario reads the tlog once via ``iter_messages`` and runs multiple analyzers over the result. ``iter_messages`` returns a generator that closes its underlying pymavlink connection on exhaustion, so re-iteration is not safe without materialisation. """ return list(messages)