"""FT-P-09-iNav — iNav MSP2_SENSOR_GPS contract conformance (AZ-417 / AC-4.3). The full scenario: 1. Force ``fc_adapter=inav``; start the SUT against the iNav SITL container on ``inav-sitl:5760``. 2. AC-1: probe the TCP connection establishment from the SUT side within ≤5 s (observable via the SITL observer's connection event). 3. Replay 60 s of Derkachi through the SUT. 4. AC-2: count MSP2_SENSOR_GPS (function ID 0x1F03) frame arrivals at iNav; assert ≥4.5 Hz observed. 5. AC-3: query iNav GPS state via ``msp_gps_toy`` subprocess; assert ``gpsSol.fixType ≥ 3``, ``provider = "MSP"``, ``gpsSol.numSat`` matches the emitted value. 6. AC-4: parameterise per ``vio_strategy`` (``fc_adapter`` fixed to ``inav``). Gated on: * ``runner.helpers.frame_source_replay`` — owned by AZ-441 * ``runner.helpers.sitl_observer`` — owned by AZ-407 (iNav probe leg is part of the iNav-side `inav_msp_observer` follow-up) Pure-logic AC-2/AC-3 coverage lives in ``e2e/_unit_tests/helpers/test_msp_frame_observer.py``. """ from __future__ import annotations from pathlib import Path import pytest from runner.helpers import msp_frame_observer as mfo DERKACHI_DIR = ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" ) DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" REPLAY_WINDOW_S = 60 TCP_HANDSHAKE_BUDGET_S = 5 @pytest.fixture(scope="module") def _inav_harness_implemented() -> bool: """True iff frame_source_replay + sitl_observer iNav leg are real.""" from runner.helpers import sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer try: replayer = FrameSourceReplayer(sink=_NullSink()) # type: ignore[arg-type] try: replayer.replay_video(Path("/tmp/non-existent.mp4")) except NotImplementedError: return False try: sitl_observer.observe_inav_tcp_handshake(host="inav-sitl", port=5760, timeout_s=0.01) except (NotImplementedError, AttributeError): return False return True except Exception: return False class _NullSink: def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None: return None @pytest.mark.traces_to("AC-4.3,AC-1,AC-2,AC-3,AC-4") def test_ft_p_09_inav( vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] request, # type: ignore[no-untyped-def] _inav_harness_implemented: bool, ) -> None: """Full FT-P-09-iNav scenario; parameterized per vio_strategy. `fc_adapter` is FORCED to ``inav`` (AC-4) — the test skips on any other adapter so the conftest matrix doesn't double-run it under ``ardupilot``. """ fc_adapter = request.getfixturevalue("fc_adapter") if fc_adapter != "inav": pytest.skip("FT-P-09-iNav is iNav-only; ardupilot variant is FT-P-09-AP (AZ-416)") if not _inav_harness_implemented: pytest.skip( "FT-P-09-iNav full scenario requires runner.helpers.{frame_source_replay," "sitl_observer.observe_inav_tcp_handshake} — currently AZ-441 / AZ-407 leftovers. " "Pure-logic AC-2/AC-3 covered by " "e2e/_unit_tests/helpers/test_msp_frame_observer.py." ) from runner.helpers import sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer # 1. AC-1: TCP handshake. handshake = sitl_observer.observe_inav_tcp_handshake( host="inav-sitl", port=5760, timeout_s=TCP_HANDSHAKE_BUDGET_S, ) assert handshake.established_within_s is not None, ( f"AC-1 (TCP connect ≤{TCP_HANDSHAKE_BUDGET_S} s) failed: no connection event" ) assert handshake.established_within_s <= TCP_HANDSHAKE_BUDGET_S, ( f"AC-1 (TCP connect ≤{TCP_HANDSHAKE_BUDGET_S} s) failed: " f"established_within_s={handshake.established_within_s}" ) # 2. Drive replay. FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4) # 3. Collect MSP frame arrivals from the iNav observer. capture = sitl_observer.collect_inav_msp_frames( host="inav-sitl", port=5760, window_s=REPLAY_WINDOW_S, ) samples = [ mfo.MspFrameSample(monotonic_ms=int(f.monotonic_ms), function_id=int(f.function_id)) for f in capture.frames ] # 4. AC-2: rate. rate_report = mfo.compute_rate_hz(samples) # 5. AC-3: iNav GPS state via msp_gps_toy. state = sitl_observer.query_inav_gps_state(host="inav-sitl") gps_report = mfo.evaluate_inav_gps_state( mfo.InavGpsSnapshot( fix_type=int(state.fix_type), num_sat=int(state.num_sat), provider=str(state.provider), ), expected_num_sat=int(capture.expected_num_sat), ) # 6. NFR metrics + assertions. nfr_recorder.record_metric( "ft_p_09_inav.frame_count", float(rate_report.frame_count), ac_id="AC-2" ) nfr_recorder.record_metric( "ft_p_09_inav.observed_rate_hz", rate_report.observed_rate_hz, ac_id="AC-2" ) nfr_recorder.record_metric( "ft_p_09_inav.tcp_handshake_s", float(handshake.established_within_s), ac_id="AC-1" ) nfr_recorder.record_metric( "ft_p_09_inav.fix_type", float(gps_report.snapshot.fix_type), ac_id="AC-3" ) assert rate_report.passes, ( f"AC-2 (≥{mfo.MIN_OBSERVED_RATE_HZ} Hz for {mfo.DEFAULT_TARGET_RATE_HZ} Hz target) failed: " f"observed_rate_hz={rate_report.observed_rate_hz:.3f}, " f"frames={rate_report.frame_count}, window_ms={rate_report.window_ms}" ) assert gps_report.passes, ( f"AC-3 failed: fix_type_ok={gps_report.fix_type_ok}, " f"provider_ok={gps_report.provider_ok}, num_sat_ok={gps_report.num_sat_ok}; " f"snapshot={gps_report.snapshot}, expected_num_sat={gps_report.expected_num_sat}" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] raise NotImplementedError( "frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay" )