Files
gps-denied-onboard/e2e/tests/positive/test_ft_p_09_inav.py
T
Oleksandr Bezdieniezhnykh 43fdef1aac [AZ-595] Batch 75: sitl_observer FDR-replay + scenario probe cleanup
Implement all 11 `sitl_observer` public surfaces as an offline
FDR-replay strategy (reads JSON fixtures under `${E2E_SITL_REPLAY_DIR}`
instead of live pymavlink/yamspy). Replace 12 per-scenario
`_harness_helpers_implemented` probes with one shared session-scoped
`sitl_replay_ready` fixture in `e2e/tests/conftest.py`.

Net: -636 LoC of duplicated scenario gating, +17 LoC shared fixture,
+38 new unit tests (596 total, up from 558). Includes K=3 cumulative
review for batches 73-75 (PASS).

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-17 09:00:55 +03:00

145 lines
5.1 KiB
Python

"""FT-P-09-iNav — iNav MSP2_SENSOR_GPS contract conformance (AZ-417 / AC-4.3).
The full scenario:
1. Force ``fc_adapter=inav``; start the SUT against the iNav SITL
container on ``inav-sitl:5760``.
2. AC-1: probe the TCP connection establishment from the SUT side
within ≤5 s (observable via the SITL observer's connection event).
3. Replay 60 s of Derkachi through the SUT.
4. AC-2: count MSP2_SENSOR_GPS (function ID 0x1F03) frame arrivals at
iNav; assert ≥4.5 Hz observed.
5. AC-3: query iNav GPS state via ``msp_gps_toy`` subprocess; assert
``gpsSol.fixType ≥ 3``, ``provider = "MSP"``, ``gpsSol.numSat``
matches the emitted value.
6. AC-4: parameterise per ``vio_strategy`` (``fc_adapter`` fixed to
``inav``).
Gated on:
* ``runner.helpers.frame_source_replay`` — owned by AZ-441
* ``runner.helpers.sitl_observer`` — owned by AZ-407 (iNav probe leg
is part of the iNav-side `inav_msp_observer` follow-up)
Pure-logic AC-2/AC-3 coverage lives in
``e2e/_unit_tests/helpers/test_msp_frame_observer.py``.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from runner.helpers import msp_frame_observer as mfo
DERKACHI_DIR = (
Path(__file__).resolve().parents[3]
/ "_docs"
/ "00_problem"
/ "input_data"
/ "flight_derkachi"
)
DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4"
REPLAY_WINDOW_S = 60
TCP_HANDSHAKE_BUDGET_S = 5
@pytest.mark.traces_to("AC-4.3,AC-1,AC-2,AC-3,AC-4")
def test_ft_p_09_inav(
vio_strategy: str,
evidence_dir, # type: ignore[no-untyped-def]
run_id: str,
nfr_recorder, # type: ignore[no-untyped-def]
request, # type: ignore[no-untyped-def]
sitl_replay_ready: bool,
) -> None:
"""Full FT-P-09-iNav scenario; parameterized per vio_strategy.
`fc_adapter` is FORCED to ``inav`` (AC-4) — the test skips on any
other adapter so the conftest matrix doesn't double-run it under
``ardupilot``.
"""
fc_adapter = request.getfixturevalue("fc_adapter")
if fc_adapter != "inav":
pytest.skip("FT-P-09-iNav is iNav-only; ardupilot variant is FT-P-09-AP (AZ-416)")
if not sitl_replay_ready:
pytest.skip(
"FT-P-09-iNav full scenario requires `E2E_SITL_REPLAY_DIR` to "
"point at a prepared SITL replay fixture (AZ-595). Pure-logic "
"AC-2/AC-3 covered by e2e/_unit_tests/helpers/test_msp_frame_observer.py."
)
from runner.helpers import sitl_observer
from runner.helpers.frame_source_replay import FrameSourceReplayer
# 1. AC-1: TCP handshake.
handshake = sitl_observer.observe_inav_tcp_handshake(
host="inav-sitl", port=5760, timeout_s=TCP_HANDSHAKE_BUDGET_S,
)
assert handshake.established_within_s is not None, (
f"AC-1 (TCP connect ≤{TCP_HANDSHAKE_BUDGET_S} s) failed: no connection event"
)
assert handshake.established_within_s <= TCP_HANDSHAKE_BUDGET_S, (
f"AC-1 (TCP connect ≤{TCP_HANDSHAKE_BUDGET_S} s) failed: "
f"established_within_s={handshake.established_within_s}"
)
# 2. Drive replay.
FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4)
# 3. Collect MSP frame arrivals from the iNav observer.
capture = sitl_observer.collect_inav_msp_frames(
host="inav-sitl", port=5760, window_s=REPLAY_WINDOW_S,
)
samples = [
mfo.MspFrameSample(monotonic_ms=int(f.monotonic_ms), function_id=int(f.function_id))
for f in capture.frames
]
# 4. AC-2: rate.
rate_report = mfo.compute_rate_hz(samples)
# 5. AC-3: iNav GPS state via msp_gps_toy.
state = sitl_observer.query_inav_gps_state(host="inav-sitl")
gps_report = mfo.evaluate_inav_gps_state(
mfo.InavGpsSnapshot(
fix_type=int(state.fix_type),
num_sat=int(state.num_sat),
provider=str(state.provider),
),
expected_num_sat=int(capture.expected_num_sat),
)
# 6. NFR metrics + assertions.
nfr_recorder.record_metric(
"ft_p_09_inav.frame_count", float(rate_report.frame_count), ac_id="AC-2"
)
nfr_recorder.record_metric(
"ft_p_09_inav.observed_rate_hz", rate_report.observed_rate_hz, ac_id="AC-2"
)
nfr_recorder.record_metric(
"ft_p_09_inav.tcp_handshake_s", float(handshake.established_within_s), ac_id="AC-1"
)
nfr_recorder.record_metric(
"ft_p_09_inav.fix_type", float(gps_report.snapshot.fix_type), ac_id="AC-3"
)
assert rate_report.passes, (
f"AC-2 (≥{mfo.MIN_OBSERVED_RATE_HZ} Hz for {mfo.DEFAULT_TARGET_RATE_HZ} Hz target) failed: "
f"observed_rate_hz={rate_report.observed_rate_hz:.3f}, "
f"frames={rate_report.frame_count}, window_ms={rate_report.window_ms}"
)
assert gps_report.passes, (
f"AC-3 failed: fix_type_ok={gps_report.fix_type_ok}, "
f"provider_ok={gps_report.provider_ok}, num_sat_ok={gps_report.num_sat_ok}; "
f"snapshot={gps_report.snapshot}, expected_num_sat={gps_report.expected_num_sat}"
)
def _resolve_frame_sink(): # type: ignore[no-untyped-def]
raise NotImplementedError(
"frame sink resolution is owned by AZ-441 / runner.helpers.frame_source_replay"
)