"""Unit tests for ``runner.helpers.msp_frame_observer`` (FT-P-09-iNav / AZ-417). Covers AC-2 (≥4.5 Hz observed for 5 Hz target) and AC-3 (fix_type ≥3, provider=MSP, numSat matches emitted value). """ from __future__ import annotations import pytest from runner.helpers.msp_frame_observer import ( DEFAULT_TARGET_RATE_HZ, MIN_FIX_TYPE, MIN_OBSERVED_RATE_HZ, MSP2_SENSOR_GPS_FUNCTION_ID, REQUIRED_PROVIDER, InavGpsSnapshot, MspFrameSample, compute_rate_hz, count_frames_by_id, evaluate_inav_gps_state, ) def _frames(rate_hz: float, n: int, function_id: int = MSP2_SENSOR_GPS_FUNCTION_ID) -> list[MspFrameSample]: """Synthetic frame stream at exactly ``rate_hz`` for ``n`` frames.""" if rate_hz <= 0: raise ValueError("rate_hz must be > 0") period_ms = int(round(1000.0 / rate_hz)) return [ MspFrameSample(monotonic_ms=i * period_ms, function_id=function_id) for i in range(n) ] def test_constants_match_spec() -> None: """The AC-2/AC-3 thresholds + IDs must match the spec text.""" # Assert assert MSP2_SENSOR_GPS_FUNCTION_ID == 0x1F03 assert DEFAULT_TARGET_RATE_HZ == 5.0 assert MIN_OBSERVED_RATE_HZ == 4.5 assert MIN_FIX_TYPE == 3 assert REQUIRED_PROVIDER == "MSP" def test_count_frames_by_id_filters_correctly() -> None: """Mixed-ID stream tallies per function ID.""" # Arrange samples = [ MspFrameSample(0, MSP2_SENSOR_GPS_FUNCTION_ID), MspFrameSample(100, 0x1F04), MspFrameSample(200, MSP2_SENSOR_GPS_FUNCTION_ID), MspFrameSample(300, MSP2_SENSOR_GPS_FUNCTION_ID), ] # Act counts = count_frames_by_id(samples) # Assert assert counts[MSP2_SENSOR_GPS_FUNCTION_ID] == 3 assert counts[0x1F04] == 1 def test_compute_rate_at_target_passes() -> None: """5 Hz over 60 s window passes the ≥4.5 Hz minimum.""" # Arrange — 60s at 5Hz = 301 samples (inclusive of t=0 and t=60000). samples = _frames(rate_hz=5.0, n=301) # Act report = compute_rate_hz(samples) # Assert assert report.frame_count == 301 assert report.observed_rate_hz == pytest.approx(5.0, abs=0.01) assert report.passes is True def test_compute_rate_at_boundary_passes() -> None: """Exactly 4.5 Hz passes (boundary is inclusive).""" # Arrange samples = _frames(rate_hz=4.5, n=46) # 10s @ 4.5Hz # Act report = compute_rate_hz(samples) # Assert assert report.observed_rate_hz == pytest.approx(4.5, abs=0.05) assert report.passes is True def test_compute_rate_below_minimum_fails() -> None: """3 Hz observed → fails the ≥4.5 Hz minimum.""" # Arrange samples = _frames(rate_hz=3.0, n=31) # 10s @ 3Hz # Act report = compute_rate_hz(samples) # Assert assert report.observed_rate_hz == pytest.approx(3.0, abs=0.05) assert report.passes is False def test_compute_rate_zero_samples_does_not_pass() -> None: """Empty input → zero count, zero rate, does not pass.""" # Act report = compute_rate_hz([]) # Assert assert report.frame_count == 0 assert report.window_ms == 0 assert report.observed_rate_hz == 0.0 assert report.passes is False def test_compute_rate_single_sample_does_not_pass() -> None: """One sample yields no window → does not pass.""" # Arrange samples = [MspFrameSample(0, MSP2_SENSOR_GPS_FUNCTION_ID)] # Act report = compute_rate_hz(samples) # Assert assert report.frame_count == 1 assert report.window_ms == 0 assert report.passes is False def test_compute_rate_filters_function_id() -> None: """Frames with a different function_id are ignored in the rate calc.""" # Arrange samples = ( _frames(rate_hz=5.0, n=51, function_id=MSP2_SENSOR_GPS_FUNCTION_ID) + _frames(rate_hz=10.0, n=101, function_id=0x1F04) ) # Act report = compute_rate_hz(samples, function_id=MSP2_SENSOR_GPS_FUNCTION_ID) # Assert assert report.frame_count == 51 assert report.observed_rate_hz == pytest.approx(5.0, abs=0.01) def test_compute_rate_rejects_negative_minimum() -> None: # Act / Assert with pytest.raises(ValueError, match="min_required_hz"): compute_rate_hz([], min_required_hz=-1.0) def test_evaluate_gps_state_passes_at_minimum_fix() -> None: """fix_type=3, provider=MSP, numSat=10 (matches emitted) → AC-3 pass.""" # Arrange snapshot = InavGpsSnapshot(fix_type=3, num_sat=10, provider="MSP") # Act report = evaluate_inav_gps_state(snapshot, expected_num_sat=10) # Assert assert report.fix_type_ok is True assert report.provider_ok is True assert report.num_sat_ok is True assert report.passes is True def test_evaluate_gps_state_fails_on_low_fix_type() -> None: """fix_type=2 < 3 → AC-3 fail.""" # Arrange snapshot = InavGpsSnapshot(fix_type=2, num_sat=10, provider="MSP") # Act report = evaluate_inav_gps_state(snapshot, expected_num_sat=10) # Assert assert report.fix_type_ok is False assert report.passes is False def test_evaluate_gps_state_fails_on_wrong_provider() -> None: """provider != MSP → AC-3 fail (fallback to internal GPS).""" # Arrange snapshot = InavGpsSnapshot(fix_type=3, num_sat=10, provider="INTERNAL") # Act report = evaluate_inav_gps_state(snapshot, expected_num_sat=10) # Assert assert report.provider_ok is False assert report.passes is False def test_evaluate_gps_state_fails_on_num_sat_mismatch() -> None: """numSat reported by iNav must match the value emitted by SUT.""" # Arrange snapshot = InavGpsSnapshot(fix_type=3, num_sat=12, provider="MSP") # Act report = evaluate_inav_gps_state(snapshot, expected_num_sat=10) # Assert assert report.num_sat_ok is False assert report.passes is False def test_evaluate_gps_state_rejects_negative_expected_num_sat() -> None: # Arrange snapshot = InavGpsSnapshot(fix_type=3, num_sat=10, provider="MSP") # Act / Assert with pytest.raises(ValueError, match="expected_num_sat"): evaluate_inav_gps_state(snapshot, expected_num_sat=-1)