"""FT-P-09-AP — ArduPilot GPS_INPUT contract + MAVLink 2.0 signing (AZ-416 / AC-4.3). The full scenario: 1. Force ``fc_adapter=ardupilot``; load ``mavlink-test-passkey.txt`` as the docker secret feeding the SUT signing channel. 2. Start the SUT against the ArduPilot SITL container; mavproxy-listener captures the wire traffic to a ``.tlog``. 3. AC-1: parse the ``.tlog``; first signed frame must arrive within ≤5 s of the first observed message; no ``BAD_SIGNATURE`` STATUSTEXT in that window. 4. Replay 60 s of Derkachi through the SUT (signed GPS_INPUT flow). 5. AC-2: GPS_INPUT cadence over the full ``.tlog`` ≥4.5 Hz. 6. AC-3: ``EK3_SRC1_POSXY`` (read via mavproxy parameter request) == 3 (GPS source). 7. AC-4: GPS_RAW_INT health (``fix_type ≥ 3`` AND ``eph ≤ 200``) for ≥80 % of the window. 8. AC-5: parameterised per ``vio_strategy`` (``fc_adapter`` fixed to ``ardupilot``). Gated on: * ``runner.helpers.frame_source_replay`` — owned by AZ-441 * ``runner.helpers.sitl_observer`` — owned by AZ-407 (AP-side leg ``capture_ap_tlog`` + ``read_ap_parameter``) Pure-logic AC-1/AC-2/AC-3/AC-4 coverage lives in ``e2e/_unit_tests/helpers/test_ap_contract_evaluator.py`` and ``e2e/_unit_tests/helpers/test_mavproxy_tlog_reader.py``. """ from __future__ import annotations from pathlib import Path import pytest from runner.helpers import ap_contract_evaluator as ace from runner.helpers import mavproxy_tlog_reader as mtr DERKACHI_DIR = ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" ) DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" MAVLINK_PASSKEY_FIXTURE = ( Path(__file__).resolve().parents[2] / "fixtures" / "secrets" / "mavlink-test-passkey.txt" ) REPLAY_WINDOW_S = 60 @pytest.mark.traces_to("AC-4.3,AC-1,AC-2,AC-3,AC-4,AC-5,D-C8-9") def test_ft_p_09_ap_signing( vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] request, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """Full FT-P-09-AP scenario; parameterized per vio_strategy.""" fc_adapter = request.getfixturevalue("fc_adapter") if fc_adapter != "ardupilot": pytest.skip("FT-P-09-AP is ArduPilot-only; iNav variant is FT-P-09-iNav (AZ-417)") if not MAVLINK_PASSKEY_FIXTURE.exists(): pytest.fail( f"mavlink-test-passkey fixture missing at {MAVLINK_PASSKEY_FIXTURE} — " "AZ-407 / AZ-408 owns the on-disk fixture." ) if not sitl_replay_ready: pytest.skip( "FT-P-09-AP full scenario requires `E2E_SITL_REPLAY_DIR` to point " "at a prepared SITL replay fixture (AZ-595). Pure-logic AC-1..AC-4 " "covered by e2e/_unit_tests/helpers/test_ap_contract_evaluator.py." ) from runner.helpers import sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer # 1. Drive replay (captures tlog continuously via mavproxy-listener). FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4) tlog_path = sitl_observer.capture_ap_tlog( host="ardupilot-sitl", duration_s=REPLAY_WINDOW_S, ) # 2. Materialise the tlog ONCE (iter_messages is single-pass). msgs = ace.collect_messages_to_list(mtr.iter_messages(tlog_path)) if not msgs: pytest.fail(f"FT-P-09-AP: empty tlog at {tlog_path}") # 3. AC-1: signing handshake. handshake = ace.observe_signing_handshake(msgs) # 4. AC-2: GPS_INPUT rate. rate = ace.compute_gps_input_rate(msgs) # 5. AC-3: EK3_SRC1_POSXY param read. ek3_value = int(sitl_observer.read_ap_parameter( host="ardupilot-sitl", name="EK3_SRC1_POSXY" )) ek3_ok = ace.validate_ek3_src1_posxy(ek3_value) # 6. AC-4: GPS_RAW_INT health. health = ace.evaluate_gps_raw_int_health(msgs) # 7. NFR metrics + assertions. if handshake.lag_s is not None: nfr_recorder.record_metric( "ft_p_09_ap.signing_handshake_s", handshake.lag_s, ac_id="AC-1" ) nfr_recorder.record_metric( "ft_p_09_ap.gps_input_rate_hz", rate.observed_rate_hz, ac_id="AC-2" ) nfr_recorder.record_metric( "ft_p_09_ap.ek3_src1_posxy", float(ek3_value), ac_id="AC-3" ) nfr_recorder.record_metric( "ft_p_09_ap.gps_raw_int_healthy_fraction", health.healthy_fraction, ac_id="AC-4" ) assert handshake.passes, ( f"AC-1 (signing handshake ≤{ace.HANDSHAKE_BUDGET_S} s, no BAD_SIGNATURE) failed: " f"first_signed_us={handshake.first_signed_us}, lag_s={handshake.lag_s}, " f"bad_signature_count={handshake.bad_signature_count}" ) assert rate.passes, ( f"AC-2 (GPS_INPUT ≥{ace.GPS_INPUT_MIN_RATE_HZ} Hz for " f"{ace.GPS_INPUT_TARGET_RATE_HZ} Hz target) failed: " f"observed_rate_hz={rate.observed_rate_hz:.3f}, frames={rate.frame_count}" ) assert ek3_ok, ( f"AC-3 (EK3_SRC1_POSXY = {ace.EK3_SRC1_POSXY_REQUIRED}) failed: got {ek3_value}" ) assert health.passes, ( f"AC-4 (GPS_RAW_INT healthy fraction ≥" f"{ace.GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED:.0%}) failed: " f"observed={health.healthy_fraction:.4f}, " f"healthy={health.healthy_samples}/{health.total_samples}" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] """Return a replay-mode `FrameSink` (counter-only; AZ-597).""" from runner.helpers.replay_mode import NullFrameSink return NullFrameSink()