"""MSP2 frame observer for FT-P-09-iNav (AZ-417 / AC-4.3). iNav consumes MSP2 over a TCP socket on port 5760. The SUT's ``c8_fc_adapter`` (iNav-side) emits ``MSP2_SENSOR_GPS`` (function ID 0x1F03) frames at a configured cadence (target 5 Hz per AC-2). This helper owns the pure-logic side of FT-P-09-iNav: * ``compute_rate_hz`` — given a sequence of frame-arrival timestamps, return the observed Hz over a window. * ``count_frames_by_id`` — filter + tally per MSP function ID. * ``evaluate_inav_gps_state`` — given a snapshot of iNav's ``gpsSol`` + ``provider`` after replay, assert AC-3 (fix_type ≥ 3, provider = MSP, numSat matches the emitted value). The TCP-probe + actual MSP frame capture path is owned by AZ-407 (``runner.helpers.sitl_observer``) and the iNav SITL docker compose service. This module only consumes already-captured data. Public-boundary discipline: does NOT import any ``src/gps_denied_onboard`` symbol. """ from __future__ import annotations from dataclasses import dataclass from typing import Sequence MSP2_SENSOR_GPS_FUNCTION_ID = 0x1F03 DEFAULT_TARGET_RATE_HZ = 5.0 MIN_OBSERVED_RATE_HZ = 4.5 # AC-2: ≥4.5 Hz observed for 5 Hz target MIN_FIX_TYPE = 3 # AC-3: gpsSol.fixType ≥ 3 REQUIRED_PROVIDER = "MSP" # AC-3: provider=MSP (no fallback to internal GPS) @dataclass(frozen=True) class MspFrameSample: """One MSP frame as captured by the SITL-side observer.""" monotonic_ms: int function_id: int @dataclass(frozen=True) class InavGpsSnapshot: """Snapshot of iNav's ``gpsSol`` + provider state after replay.""" fix_type: int num_sat: int provider: str @dataclass(frozen=True) class RateReport: """Observed rate over a window with pass/fail vs spec target.""" frame_count: int window_ms: int observed_rate_hz: float target_rate_hz: float min_required_hz: float @property def passes(self) -> bool: return ( self.window_ms > 0 and self.observed_rate_hz >= self.min_required_hz ) @dataclass(frozen=True) class InavGpsReport: """Evaluation of iNav GPS state against AC-3.""" snapshot: InavGpsSnapshot expected_num_sat: int fix_type_ok: bool provider_ok: bool num_sat_ok: bool @property def passes(self) -> bool: return self.fix_type_ok and self.provider_ok and self.num_sat_ok def count_frames_by_id(samples: Sequence[MspFrameSample]) -> dict[int, int]: """Tally per MSP function ID.""" counts: dict[int, int] = {} for s in samples: counts[s.function_id] = counts.get(s.function_id, 0) + 1 return counts def compute_rate_hz( samples: Sequence[MspFrameSample], *, function_id: int = MSP2_SENSOR_GPS_FUNCTION_ID, target_rate_hz: float = DEFAULT_TARGET_RATE_HZ, min_required_hz: float = MIN_OBSERVED_RATE_HZ, ) -> RateReport: """Compute observed Hz for the given function_id over the sample window. The window is ``[first_sample.monotonic_ms, last_sample.monotonic_ms]`` inclusive. A window of zero ms (≤1 matching sample) is reported but will not pass. """ if min_required_hz < 0: raise ValueError(f"min_required_hz must be ≥0, got {min_required_hz}") filtered = [s for s in samples if s.function_id == function_id] if len(filtered) < 2: return RateReport( frame_count=len(filtered), window_ms=0, observed_rate_hz=0.0, target_rate_hz=target_rate_hz, min_required_hz=min_required_hz, ) window_ms = filtered[-1].monotonic_ms - filtered[0].monotonic_ms if window_ms <= 0: return RateReport( frame_count=len(filtered), window_ms=window_ms, observed_rate_hz=0.0, target_rate_hz=target_rate_hz, min_required_hz=min_required_hz, ) # Rate = (count - 1) / (window in seconds); the first frame is the # epoch boundary, subsequent frames define the cadence. observed = (len(filtered) - 1) / (window_ms / 1000.0) return RateReport( frame_count=len(filtered), window_ms=window_ms, observed_rate_hz=observed, target_rate_hz=target_rate_hz, min_required_hz=min_required_hz, ) def evaluate_inav_gps_state( snapshot: InavGpsSnapshot, *, expected_num_sat: int, min_fix_type: int = MIN_FIX_TYPE, required_provider: str = REQUIRED_PROVIDER, ) -> InavGpsReport: """Validate AC-3: fix_type ≥3, provider=MSP, numSat matches emitted value.""" if expected_num_sat < 0: raise ValueError(f"expected_num_sat must be ≥0, got {expected_num_sat}") return InavGpsReport( snapshot=snapshot, expected_num_sat=expected_num_sat, fix_type_ok=snapshot.fix_type >= min_fix_type, provider_ok=snapshot.provider == required_provider, num_sat_ok=snapshot.num_sat == expected_num_sat, )