"""FT-P-13 — GCS command path: operator re-loc hint (AZ-420 / AC-6.2). The full scenario: 1. Drive the SUT into ``dead_reckoned`` state (e.g. via a synthesised mid-blackout segment, FT-N-03 style). ``mavproxy-listener`` captures the SUT↔GCS link to ``gcs_tlog_.tlog``. 2. While the SUT is in ``dead_reckoned``, the fixture builder has injected one ``STATUSTEXT`` from mavproxy carrying the operator's re-loc hint (payload ``RELOC:,,``). 3. The SUT's C8 ``QgcTelemetryAdapter`` translates the inbound command into an ``OperatorCommand`` DTO and emits an FDR ``log`` record with ``payload.kind == "c8.gcs.operator_command"``. 4. The next nav-camera frame after the hint causes C2 to publish a new per-frame ``anchor_search_region`` FDR record whose centre has shifted toward the hint relative to the last pre-hint region. 5. No ``BAD_SIGNATURE`` / ``UNAUTHORIZED`` / ``REJECTED`` STATUSTEXT is emitted in the ack window — the hint is well-formed, not a security event. ACs: * AC-1: FT-P-12 GCS rate — covered by ``test_ft_p_12_gcs_downsample``; this file does NOT re-assert it (single source of truth). * AC-2: hint ack via FDR within ≤2 s — ``correlate_hint_acks``. * AC-3: search prior bias toward hint — ``evaluate_search_region_shift`` against ``anchor_search_region`` FDR records. * AC-4: no security/auth rejection — ``detect_hint_rejection``. * AC-5: parameterised per ``(fc_adapter, vio_strategy)``. Gated on: * ``runner.helpers.frame_source_replay`` — owned by AZ-441 (still a stub today; scenario skips via ``sitl_replay_ready``). * ``runner.helpers.sitl_observer.capture_gcs_tlog`` — owned by AZ-420. * ``runner.helpers.fdr_reader`` — owned by AZ-594. * ``runner.helpers.gcs_telemetry_evaluator`` — unit-tested in ``e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py``. """ from __future__ import annotations from pathlib import Path import pytest from runner.helpers import gcs_telemetry_evaluator as gte from runner.helpers import mavproxy_tlog_reader as mtr DERKACHI_DIR = ( Path(__file__).resolve().parents[3] / "_docs" / "00_problem" / "input_data" / "flight_derkachi" ) DERKACHI_MP4 = DERKACHI_DIR / "flight_derkachi.mp4" REPLAY_WINDOW_S = 60 @pytest.mark.traces_to("AC-6.2,AC-2,AC-3,AC-4,AC-5") def test_ft_p_13_gcs_command( fc_adapter: str, vio_strategy: str, evidence_dir, # type: ignore[no-untyped-def] run_id: str, nfr_recorder, # type: ignore[no-untyped-def] sitl_replay_ready: bool, ) -> None: """Full FT-P-13 scenario (AC-6.2). See module docstring. AC-2: hint ack ≤2 s via FDR ``c8.gcs.operator_command`` record — covered by ``correlate_hint_acks`` + ``HintAckReport.passes``. AC-3: anchor search region biases toward hint — covered by ``evaluate_search_region_shift``. AC-4: no rejection STATUSTEXT in the ack window — covered by ``detect_hint_rejection``. AC-5: parameterised across ``(fc_adapter, vio_strategy)``. """ if not sitl_replay_ready: pytest.skip( "FT-P-13 full replay requires `E2E_SITL_REPLAY_DIR` to point at a " "prepared SITL replay fixture exposing `gcs_tlog_.tlog` " "with an injected `RELOC:` STATUSTEXT plus the matching FDR " "`c8.gcs.operator_command` ack record and `anchor_search_region` " "per-frame records (AZ-595 + AZ-420 fixture builder). Pure-logic " "AC-6.2 coverage lives in " "e2e/_unit_tests/helpers/test_gcs_telemetry_evaluator.py." ) from runner.helpers import fdr_reader, sitl_observer from runner.helpers.frame_source_replay import FrameSourceReplayer sitl_host = "sitl-ardupilot" if fc_adapter == "ardupilot" else "sitl-inav" # 1. Drive replay; the mavproxy-listener and FDR sink capture in parallel. FrameSourceReplayer(_resolve_frame_sink()).replay_video(DERKACHI_MP4) tlog_path = sitl_observer.capture_gcs_tlog(host=sitl_host, duration_s=REPLAY_WINDOW_S) # 2. Materialise the tlog ONCE (iter_messages is single-pass) and # extract the operator-injected RELOC: hints. msgs = gte.collect_messages_to_list(mtr.iter_messages(tlog_path)) if not msgs: pytest.fail(f"FT-P-13: empty GCS tlog at {tlog_path}") hints = gte.extract_inbound_hints(msgs) if not hints: pytest.fail( f"FT-P-13: GCS tlog at {tlog_path} contains no `RELOC:` STATUSTEXT — " "the fixture builder must inject at least one operator re-loc hint." ) # 3. Walk the FDR archive for c8.gcs.operator_command acks + # anchor_search_region per-frame records. fdr_root = Path(evidence_dir).parent / f"run-{run_id}" / "fdr" acks: list[gte.FdrCommandAck] = [] regions: list[gte.SearchRegionRecord] = [] for rec in fdr_reader.iter_records(fdr_root): if ( rec.record_type == "log" and rec.payload.get("kind") == gte.HINT_FDR_KIND and isinstance(rec.payload.get("kv"), dict) ): acks.append( gte.FdrCommandAck( ack_timestamp_us=int(rec.monotonic_ms) * 1000, payload_kv=dict(rec.payload["kv"]), # type: ignore[arg-type] ) ) elif rec.record_type == gte.ANCHOR_SEARCH_REGION_FDR_KIND: regions.append( gte.SearchRegionRecord( monotonic_us=int(rec.monotonic_ms) * 1000, centre_lat_deg=float(rec.payload["centre_lat_deg"]), # type: ignore[arg-type] centre_lon_deg=float(rec.payload["centre_lon_deg"]), # type: ignore[arg-type] radius_m=float(rec.payload["radius_m"]), # type: ignore[arg-type] ) ) # 4. AC-2: ack latencies. ack_report = gte.correlate_hint_acks(hints, acks) # 5. AC-3: search-region shift (evaluated against the FIRST hint only; # multi-hint scenarios are out of scope for AC-6.2 single-pass). first_hint = hints[0] hint_lat, hint_lon, _radius_m = gte.parse_reloc_payload(first_hint.hint_text) shift_report = gte.evaluate_search_region_shift( regions, hint_inject_timestamp_us=first_hint.inject_timestamp_us, hint_lat_deg=hint_lat, hint_lon_deg=hint_lon, ) # 6. AC-4: no rejection in the ack window. rejection_report = gte.detect_hint_rejection(msgs, first_hint.inject_timestamp_us) # 7. NFR metrics. first_latency = ack_report.latencies_ms[0] if ack_report.latencies_ms else None if first_latency is not None: nfr_recorder.record_metric( "ft_p_13.hint_ack_latency_ms", first_latency, ac_id="AC-2" ) nfr_recorder.record_metric( "ft_p_13.hint_count", float(len(hints)), ac_id="AC-2" ) nfr_recorder.record_metric( "ft_p_13.acked_count", float(ack_report.acked_count), ac_id="AC-2" ) if shift_report.distance_after_m is not None: nfr_recorder.record_metric( "ft_p_13.search_region_distance_after_m", shift_report.distance_after_m, ac_id="AC-3", ) if shift_report.distance_before_m is not None: nfr_recorder.record_metric( "ft_p_13.search_region_distance_before_m", shift_report.distance_before_m, ac_id="AC-3", ) nfr_recorder.record_metric( "ft_p_13.rejection_count", float(rejection_report.rejection_count), ac_id="AC-4" ) # 8. AC assertions. assert ack_report.passes, ( f"AC-2 (hint ack ≤{ack_report.max_required_ms} ms via FDR " f"`{gte.HINT_FDR_KIND}` record) failed: " f"hints={len(ack_report.hints)}, acked={ack_report.acked_count}, " f"latencies_ms={ack_report.latencies_ms}" ) assert shift_report.passes, ( "AC-3 (anchor_search_region centre shifts toward hint) failed: " f"region_before={shift_report.region_before}, " f"region_after={shift_report.region_after}, " f"distance_before_m={shift_report.distance_before_m}, " f"distance_after_m={shift_report.distance_after_m}" ) assert rejection_report.passes, ( f"AC-4 (no rejection STATUSTEXT in {rejection_report.window_us / 1e6:.1f} s " "post-inject window) failed: " f"rejection_count={rejection_report.rejection_count}, " f"texts={rejection_report.rejection_texts}" ) def _resolve_frame_sink(): # type: ignore[no-untyped-def] """Return a replay-mode `FrameSink` (counter-only; AZ-597).""" from runner.helpers.replay_mode import NullFrameSink return NullFrameSink()