Files
gps-denied-onboard/e2e/tests/positive/test_ft_p_09_ap_signing.py
T
Oleksandr Bezdieniezhnykh a644debdb7 [AZ-416] [AZ-417] [AZ-419] Test batch 72: FT-P-09 AP/iNav + FT-P-11 cold start
- AZ-416 (FT-P-09-AP): fills mavproxy_tlog_reader.iter_messages with
  pymavlink body (AZ-406 surface kept); adds ap_contract_evaluator
  covering AC-1 (signing handshake <=5s), AC-2 (GPS_INPUT >=4.5 Hz),
  AC-3 (EK3_SRC1_POSXY=3), AC-4 (GPS_RAW_INT health >=80%); scenario
  forces fc_adapter=ardupilot.
- AZ-417 (FT-P-09-iNav): msp_frame_observer covering AC-2 (MSP rate)
  and AC-3 (fix_type/provider/numSat); scenario forces
  fc_adapter=inav.
- AZ-419 (FT-P-11): cold_start_evaluator covering AC-1 (operator
  manifest origin), AC-2 (FC EKF fallback), AC-3 (no-origin abort),
  AC-4 (bounded-delta conflict, ADR-010 Principle #11 amended);
  scenario parametrized on origin_source plus dedicated no-origin
  abort scenario.
- All scenarios skip-gated on upstream frame_source_replay /
  imu_replay / fdr_reader / sitl_observer extensions.
- +67 unit tests; full e2e unit suite: 460 passed.
- K=3 cumulative review fired: PASS for batches 70-72.

See _docs/03_implementation/batch_72_report.md,
_docs/03_implementation/reviews/batch_72_review.md,
_docs/03_implementation/cumulative_review_batches_70-72_cycle1_report.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 07:49:17 +03:00

185 lines
6.6 KiB
Python

"""FT-P-09-AP — ArduPilot GPS_INPUT contract + MAVLink 2.0 signing (AZ-416 / AC-4.3).
The full scenario:
1. Force ``fc_adapter=ardupilot``; load ``mavlink-test-passkey.txt``
as the docker secret feeding the SUT signing channel.
2. Start the SUT against the ArduPilot SITL container; mavproxy-listener
captures the wire traffic to a ``.tlog``.
3. AC-1: parse the ``.tlog``; first signed frame must arrive within
≤5 s of the first observed message; no ``BAD_SIGNATURE`` STATUSTEXT
in that window.
4. Replay 60 s of Derkachi through the SUT (signed GPS_INPUT flow).
5. AC-2: GPS_INPUT cadence over the full ``.tlog`` ≥4.5 Hz.
6. AC-3: ``EK3_SRC1_POSXY`` (read via mavproxy parameter request) ==
3 (GPS source).
7. AC-4: GPS_RAW_INT health (``fix_type ≥ 3`` AND ``eph ≤ 200``)
for ≥80 % of the window.
8. AC-5: parameterised per ``vio_strategy`` (``fc_adapter`` fixed to
``ardupilot``).
Gated on:
* ``runner.helpers.frame_source_replay`` — owned by AZ-441
* ``runner.helpers.sitl_observer`` — owned by AZ-407 (AP-side leg
``capture_ap_tlog`` + ``read_ap_parameter``)
Pure-logic AC-1/AC-2/AC-3/AC-4 coverage lives in
``e2e/_unit_tests/helpers/test_ap_contract_evaluator.py`` and
``e2e/_unit_tests/helpers/test_mavproxy_tlog_reader.py``.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from runner.helpers import ap_contract_evaluator as ace
from runner.helpers import mavproxy_tlog_reader as mtr
DERKACHI_DIR = (
Path(__file__).resolve().parents[3]
/ "_docs"
/ "00_problem"
/ "input_data"
/ "flight_derkachi"
)
DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4"
MAVLINK_PASSKEY_FIXTURE = (
Path(__file__).resolve().parents[2]
/ "fixtures"
/ "secrets"
/ "mavlink-test-passkey.txt"
)
REPLAY_WINDOW_S = 60
@pytest.fixture(scope="module")
def _ap_harness_implemented() -> bool:
"""True iff frame_source_replay + sitl_observer AP-side leg are real."""
from runner.helpers import sitl_observer
from runner.helpers.frame_source_replay import FrameSourceReplayer
try:
replayer = FrameSourceReplayer(sink=_NullSink()) # type: ignore[arg-type]
try:
replayer.replay_video(Path("/tmp/non-existent.mp4"))
except NotImplementedError:
return False
try:
sitl_observer.capture_ap_tlog(host="ardupilot-sitl", duration_s=0.01)
except (NotImplementedError, AttributeError):
return False
try:
sitl_observer.read_ap_parameter(host="ardupilot-sitl", name="EK3_SRC1_POSXY")
except (NotImplementedError, AttributeError):
return False
return True
except Exception:
return False
class _NullSink:
def write_frame(self, jpeg_bytes: bytes, timestamp_ms: int) -> None:
return None
@pytest.mark.traces_to("AC-4.3,AC-1,AC-2,AC-3,AC-4,AC-5,D-C8-9")
def test_ft_p_09_ap_signing(
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
request, # type: ignore[no-untyped-def]
_ap_harness_implemented: bool,
) -> None:
"""Full FT-P-09-AP scenario; parameterized per vio_strategy."""
fc_adapter = request.getfixturevalue("fc_adapter")
if fc_adapter != "ardupilot":
pytest.skip("FT-P-09-AP is ArduPilot-only; iNav variant is FT-P-09-iNav (AZ-417)")
if not MAVLINK_PASSKEY_FIXTURE.exists():
pytest.fail(
f"mavlink-test-passkey fixture missing at {MAVLINK_PASSKEY_FIXTURE}"
"AZ-407 / AZ-408 owns the on-disk fixture."
)
if not _ap_harness_implemented:
pytest.skip(
"FT-P-09-AP full scenario requires runner.helpers.{frame_source_replay,"
"sitl_observer.capture_ap_tlog,sitl_observer.read_ap_parameter} — "
"currently AZ-441 / AZ-407 leftovers. Pure-logic AC-1..AC-4 covered by "
"e2e/_unit_tests/helpers/test_ap_contract_evaluator.py."
)
from runner.helpers import sitl_observer
from runner.helpers.frame_source_replay import FrameSourceReplayer
# 1. Drive replay (captures tlog continuously via mavproxy-listener).
FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4)
tlog_path = sitl_observer.capture_ap_tlog(
host="ardupilot-sitl", duration_s=REPLAY_WINDOW_S,
)
# 2. Materialise the tlog ONCE (iter_messages is single-pass).
msgs = ace.collect_messages_to_list(mtr.iter_messages(tlog_path))
if not msgs:
pytest.fail(f"FT-P-09-AP: empty tlog at {tlog_path}")
# 3. AC-1: signing handshake.
handshake = ace.observe_signing_handshake(msgs)
# 4. AC-2: GPS_INPUT rate.
rate = ace.compute_gps_input_rate(msgs)
# 5. AC-3: EK3_SRC1_POSXY param read.
ek3_value = int(sitl_observer.read_ap_parameter(
host="ardupilot-sitl", name="EK3_SRC1_POSXY"
))
ek3_ok = ace.validate_ek3_src1_posxy(ek3_value)
# 6. AC-4: GPS_RAW_INT health.
health = ace.evaluate_gps_raw_int_health(msgs)
# 7. NFR metrics + assertions.
if handshake.lag_s is not None:
nfr_recorder.record_metric(
"ft_p_09_ap.signing_handshake_s", handshake.lag_s, ac_id="AC-1"
)
nfr_recorder.record_metric(
"ft_p_09_ap.gps_input_rate_hz", rate.observed_rate_hz, ac_id="AC-2"
)
nfr_recorder.record_metric(
"ft_p_09_ap.ek3_src1_posxy", float(ek3_value), ac_id="AC-3"
)
nfr_recorder.record_metric(
"ft_p_09_ap.gps_raw_int_healthy_fraction", health.healthy_fraction, ac_id="AC-4"
)
assert handshake.passes, (
f"AC-1 (signing handshake ≤{ace.HANDSHAKE_BUDGET_S} s, no BAD_SIGNATURE) failed: "
f"first_signed_us={handshake.first_signed_us}, lag_s={handshake.lag_s}, "
f"bad_signature_count={handshake.bad_signature_count}"
)
assert rate.passes, (
f"AC-2 (GPS_INPUT ≥{ace.GPS_INPUT_MIN_RATE_HZ} Hz for "
f"{ace.GPS_INPUT_TARGET_RATE_HZ} Hz target) failed: "
f"observed_rate_hz={rate.observed_rate_hz:.3f}, frames={rate.frame_count}"
)
assert ek3_ok, (
f"AC-3 (EK3_SRC1_POSXY = {ace.EK3_SRC1_POSXY_REQUIRED}) failed: got {ek3_value}"
)
assert health.passes, (
f"AC-4 (GPS_RAW_INT healthy fraction ≥"
f"{ace.GPS_RAW_INT_HEALTHY_FRACTION_REQUIRED:.0%}) failed: "
f"observed={health.healthy_fraction:.4f}, "
f"healthy={health.healthy_samples}/{health.total_samples}"
)
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
raise NotImplementedError(
"frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay"
)