mirror of
https://github.com/azaion/gps-denied-onboard.git
synced 2026-06-22 21:01:13 +00:00
[AZ-420] Batch 81: FT-P-12 + FT-P-13 GCS scenarios
FT-P-12: parse mavproxy-listener tlog over a 60 s Derkachi replay and assert SUT->GCS GLOBAL_POSITION_INT cadence lands in [1, 2] Hz (AC-6.1). FT-P-13: inject `RELOC:<lat>,<lon>,<radius_m>` STATUSTEXT while the SUT is in dead_reckoned; verify FDR `c8.gcs.operator_command` ack <=2s, `anchor_search_region` centre shifts toward the hint, and no BAD_SIGNATURE / UNAUTHORIZED / REJECTED STATUSTEXT lands in the post-inject window (AC-6.2). Adds runner.helpers.gcs_telemetry_evaluator (rate, hint-ack correlation, haversine search-region shift, rejection scan) and sitl_observer.capture_gcs_tlog (parity surface to capture_ap_tlog). Pure-logic coverage: 39 new unit tests; full e2e/_unit_tests/ suite 746 passing (was 700). Scenarios skip locally on missing SITL replay fixture; production hooks (inbound STATUSTEXT parser, anchor_search_region FDR emitter) tracked outside this task. See _docs/03_implementation/batch_81_report.md + reviews/batch_81_review.md. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,549 @@
|
||||
"""Unit tests for ``runner.helpers.gcs_telemetry_evaluator`` (AZ-420).
|
||||
|
||||
The pure-logic AC-6.1 / AC-6.2 coverage scenarios for FT-P-12 + FT-P-13.
|
||||
The full e2e scenarios in ``e2e/tests/positive/test_ft_p_1[23]_*.py``
|
||||
exercise the same helpers end-to-end when ``E2E_SITL_REPLAY_DIR`` is
|
||||
prepared; this file covers the helpers in isolation so AC verification
|
||||
does not depend on the SITL fixture.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
import pytest
|
||||
|
||||
from runner.helpers import gcs_telemetry_evaluator as gte
|
||||
from runner.helpers.mavproxy_tlog_reader import TlogMessage
|
||||
|
||||
|
||||
def _gpi(timestamp_us: int) -> TlogMessage:
|
||||
"""Construct a minimal GLOBAL_POSITION_INT TlogMessage for tests."""
|
||||
return TlogMessage(
|
||||
timestamp_us=timestamp_us,
|
||||
msg_type="GLOBAL_POSITION_INT",
|
||||
signed=True,
|
||||
fields={"lat": 0, "lon": 0, "alt": 0},
|
||||
)
|
||||
|
||||
|
||||
def _nvf(timestamp_us: int) -> TlogMessage:
|
||||
return TlogMessage(
|
||||
timestamp_us=timestamp_us,
|
||||
msg_type="NAMED_VALUE_FLOAT",
|
||||
signed=True,
|
||||
fields={"name": b"horiz_m", "value": 7.5},
|
||||
)
|
||||
|
||||
|
||||
def _statustext(timestamp_us: int, text: str) -> TlogMessage:
|
||||
return TlogMessage(
|
||||
timestamp_us=timestamp_us,
|
||||
msg_type="STATUSTEXT",
|
||||
signed=False,
|
||||
fields={"severity": 4, "text": text},
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────── compute_gcs_summary_rate ───────────────────
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_passes_within_band() -> None:
|
||||
# Arrange: 60 GLOBAL_POSITION_INT at 1.5 Hz over 60 s.
|
||||
interval_us = int(1_000_000 / 1.5)
|
||||
msgs = [_gpi(i * interval_us) for i in range(91)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 1.5, abs_tol=1e-3)
|
||||
assert report.total_summary_messages == 91
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_fails_below_band() -> None:
|
||||
# Arrange: 0.5 Hz cadence over 60 s.
|
||||
interval_us = 2_000_000
|
||||
msgs = [_gpi(i * interval_us) for i in range(31)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 0.5, abs_tol=1e-3)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_fails_above_band() -> None:
|
||||
# Arrange: 5 Hz cadence (matches the un-downsampled emit_summary).
|
||||
interval_us = 200_000
|
||||
msgs = [_gpi(i * interval_us) for i in range(301)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 5.0, abs_tol=1e-3)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_ignores_companion_named_value_float() -> None:
|
||||
# Arrange: interleave NAMED_VALUE_FLOAT companions; they MUST NOT be
|
||||
# counted as separate summary bursts (avoids double-counting).
|
||||
interval_us = int(1_000_000 / 1.5)
|
||||
msgs = [_gpi(i * interval_us) for i in range(91)]
|
||||
msgs.extend(_nvf(i * interval_us + 1) for i in range(91))
|
||||
msgs.sort(key=lambda m: m.timestamp_us)
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert report.total_summary_messages == 91
|
||||
assert math.isclose(report.observed_rate_hz, 1.5, abs_tol=1e-3)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_handles_empty_input() -> None:
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate([])
|
||||
|
||||
# Assert
|
||||
assert report.total_summary_messages == 0
|
||||
assert report.window_us == 0
|
||||
assert report.observed_rate_hz == 0.0
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_handles_single_message() -> None:
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate([_gpi(0)])
|
||||
|
||||
# Assert
|
||||
assert report.total_summary_messages == 1
|
||||
assert report.window_us == 0
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_rejects_negative_min_hz() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="min_required_hz must be ≥0"):
|
||||
gte.compute_gcs_summary_rate([_gpi(0)], min_required_hz=-1.0)
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_rejects_inverted_band() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="max_required_hz"):
|
||||
gte.compute_gcs_summary_rate([_gpi(0)], min_required_hz=2.0, max_required_hz=1.0)
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_accepts_boundary_min() -> None:
|
||||
# Arrange: exactly 1 Hz.
|
||||
msgs = [_gpi(i * 1_000_000) for i in range(11)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 1.0, abs_tol=1e-6)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_compute_gcs_summary_rate_accepts_boundary_max() -> None:
|
||||
# Arrange: exactly 2 Hz.
|
||||
msgs = [_gpi(i * 500_000) for i in range(21)]
|
||||
|
||||
# Act
|
||||
report = gte.compute_gcs_summary_rate(msgs)
|
||||
|
||||
# Assert
|
||||
assert math.isclose(report.observed_rate_hz, 2.0, abs_tol=1e-6)
|
||||
assert report.passes
|
||||
|
||||
|
||||
# ─────────────────── extract_inbound_hints ───────────────────
|
||||
|
||||
|
||||
def test_extract_inbound_hints_finds_reloc_prefix() -> None:
|
||||
# Arrange
|
||||
msgs = [
|
||||
_statustext(1_000_000, "RELOC:50.0,36.0,200"),
|
||||
_statustext(2_000_000, "EKF position alert"),
|
||||
_statustext(3_000_000, "RELOC:50.1,36.1,250"),
|
||||
]
|
||||
|
||||
# Act
|
||||
hints = gte.extract_inbound_hints(msgs)
|
||||
|
||||
# Assert
|
||||
assert [h.inject_timestamp_us for h in hints] == [1_000_000, 3_000_000]
|
||||
assert hints[0].hint_text == "RELOC:50.0,36.0,200"
|
||||
|
||||
|
||||
def test_extract_inbound_hints_ignores_non_statustext() -> None:
|
||||
# Arrange
|
||||
msgs = [_gpi(0), _nvf(1_000_000), _statustext(2_000_000, "RELOC:1,2,3")]
|
||||
|
||||
# Act
|
||||
hints = gte.extract_inbound_hints(msgs)
|
||||
|
||||
# Assert
|
||||
assert len(hints) == 1
|
||||
assert hints[0].inject_timestamp_us == 2_000_000
|
||||
|
||||
|
||||
def test_extract_inbound_hints_honors_custom_prefix() -> None:
|
||||
# Arrange
|
||||
msgs = [
|
||||
_statustext(1_000_000, "HINT:50,36,200"),
|
||||
_statustext(2_000_000, "RELOC:50,36,200"),
|
||||
]
|
||||
|
||||
# Act
|
||||
hints = gte.extract_inbound_hints(msgs, hint_prefix="HINT:")
|
||||
|
||||
# Assert
|
||||
assert len(hints) == 1
|
||||
assert hints[0].hint_text == "HINT:50,36,200"
|
||||
|
||||
|
||||
# ─────────────────── parse_reloc_payload ───────────────────
|
||||
|
||||
|
||||
def test_parse_reloc_payload_returns_triplet() -> None:
|
||||
# Assert
|
||||
assert gte.parse_reloc_payload("RELOC:50.0,36.0,200.5") == (50.0, 36.0, 200.5)
|
||||
|
||||
|
||||
def test_parse_reloc_payload_rejects_wrong_prefix() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="does not start with 'RELOC:'"):
|
||||
gte.parse_reloc_payload("HINT:50,36,200")
|
||||
|
||||
|
||||
def test_parse_reloc_payload_rejects_wrong_field_count() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="3 comma-separated fields"):
|
||||
gte.parse_reloc_payload("RELOC:50,36")
|
||||
|
||||
|
||||
def test_parse_reloc_payload_rejects_non_float_fields() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="must be floats"):
|
||||
gte.parse_reloc_payload("RELOC:north,east,200")
|
||||
|
||||
|
||||
# ─────────────────── correlate_hint_acks ───────────────────
|
||||
|
||||
|
||||
def test_correlate_hint_acks_pairs_in_injection_order() -> None:
|
||||
# Arrange
|
||||
hints = (
|
||||
gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:50,36,200"),
|
||||
gte.InboundHint(inject_timestamp_us=5_000_000, hint_text="RELOC:51,37,200"),
|
||||
)
|
||||
acks = (
|
||||
gte.FdrCommandAck(ack_timestamp_us=2_500_000, payload_kv={"command": "STATUSTEXT", "i": 0}),
|
||||
gte.FdrCommandAck(ack_timestamp_us=6_500_000, payload_kv={"command": "STATUSTEXT", "i": 1}),
|
||||
)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 2
|
||||
assert report.latencies_ms == (1500.0, 1500.0)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_marks_missing_ack_as_none() -> None:
|
||||
# Arrange
|
||||
hints = (gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:1,2,3"),)
|
||||
acks: tuple[gte.FdrCommandAck, ...] = ()
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 0
|
||||
assert report.latencies_ms == (None,)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_fails_when_latency_exceeds_budget() -> None:
|
||||
# Arrange: 2.5 s latency vs 2.0 s budget.
|
||||
hints = (gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:1,2,3"),)
|
||||
acks = (gte.FdrCommandAck(ack_timestamp_us=3_500_000, payload_kv={"command": "STATUSTEXT"}),)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 1
|
||||
assert report.latencies_ms == (2500.0,)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_ignores_pre_hint_acks() -> None:
|
||||
# Arrange
|
||||
hints = (gte.InboundHint(inject_timestamp_us=5_000_000, hint_text="RELOC:1,2,3"),)
|
||||
acks = (
|
||||
gte.FdrCommandAck(ack_timestamp_us=1_000_000, payload_kv={"command": "STATUSTEXT"}),
|
||||
gte.FdrCommandAck(ack_timestamp_us=6_000_000, payload_kv={"command": "STATUSTEXT"}),
|
||||
)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.acked_count == 1
|
||||
assert report.latencies_ms == (1000.0,)
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_each_ack_matches_only_once() -> None:
|
||||
# Arrange: two hints, one ack — second hint must show as unacked.
|
||||
hints = (
|
||||
gte.InboundHint(inject_timestamp_us=1_000_000, hint_text="RELOC:1,2,3"),
|
||||
gte.InboundHint(inject_timestamp_us=2_000_000, hint_text="RELOC:1,2,3"),
|
||||
)
|
||||
acks = (gte.FdrCommandAck(ack_timestamp_us=1_500_000, payload_kv={"command": "STATUSTEXT"}),)
|
||||
|
||||
# Act
|
||||
report = gte.correlate_hint_acks(hints, acks)
|
||||
|
||||
# Assert
|
||||
assert report.latencies_ms == (500.0, None)
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_correlate_hint_acks_handles_no_hints() -> None:
|
||||
# Act
|
||||
report = gte.correlate_hint_acks((), ())
|
||||
|
||||
# Assert
|
||||
assert report.latencies_ms == ()
|
||||
assert not report.passes # no hints injected → can't certify AC-2
|
||||
|
||||
|
||||
# ─────────────────── haversine_distance_m ───────────────────
|
||||
|
||||
|
||||
def test_haversine_distance_m_is_zero_for_same_point() -> None:
|
||||
# Assert
|
||||
assert gte.haversine_distance_m(50.0, 36.0, 50.0, 36.0) == pytest.approx(0.0, abs=1e-6)
|
||||
|
||||
|
||||
def test_haversine_distance_m_known_baseline() -> None:
|
||||
# Arrange: ~1 deg of latitude near the equator ≈ 111.195 km on a
|
||||
# spherical earth with mean radius 6_371_008.8 m.
|
||||
expected_m = math.radians(1.0) * 6_371_008.8
|
||||
|
||||
# Act
|
||||
distance = gte.haversine_distance_m(0.0, 0.0, 1.0, 0.0)
|
||||
|
||||
# Assert
|
||||
assert distance == pytest.approx(expected_m, rel=1e-6)
|
||||
|
||||
|
||||
def test_haversine_distance_m_is_symmetric() -> None:
|
||||
# Arrange
|
||||
a = (50.0, 36.0)
|
||||
b = (50.5, 36.5)
|
||||
|
||||
# Act
|
||||
d_ab = gte.haversine_distance_m(*a, *b)
|
||||
d_ba = gte.haversine_distance_m(*b, *a)
|
||||
|
||||
# Assert
|
||||
assert d_ab == pytest.approx(d_ba, rel=1e-9)
|
||||
|
||||
|
||||
# ─────────────────── evaluate_search_region_shift ───────────────────
|
||||
|
||||
|
||||
def _region(monotonic_us: int, lat: float, lon: float, radius_m: float = 100.0) -> gte.SearchRegionRecord:
|
||||
return gte.SearchRegionRecord(
|
||||
monotonic_us=monotonic_us, centre_lat_deg=lat, centre_lon_deg=lon, radius_m=radius_m
|
||||
)
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_passes_when_post_moves_closer() -> None:
|
||||
# Arrange: hint at (50.0, 36.0); pre-region was 1 km north; post is 200 m north.
|
||||
pre = _region(1_000_000, 50.01, 36.0) # ~1.1 km from hint
|
||||
post = _region(3_000_000, 50.002, 36.0) # ~222 m from hint
|
||||
regions = [pre, post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_before is pre
|
||||
assert report.region_after is post
|
||||
assert report.distance_before_m is not None and report.distance_after_m is not None
|
||||
assert report.distance_after_m < report.distance_before_m
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_fails_when_post_moves_further() -> None:
|
||||
# Arrange
|
||||
pre = _region(1_000_000, 50.001, 36.0)
|
||||
post = _region(3_000_000, 50.01, 36.0)
|
||||
regions = [pre, post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_passes_when_no_pre_region() -> None:
|
||||
# Arrange: no pre-hint region — any post-hint region counts as a pass.
|
||||
post = _region(3_000_000, 50.0, 36.0)
|
||||
regions = [post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_before is None
|
||||
assert report.region_after is post
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_fails_when_no_post_region() -> None:
|
||||
# Arrange
|
||||
pre = _region(1_000_000, 50.0, 36.0)
|
||||
regions = [pre]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_after is None
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_evaluate_search_region_shift_keeps_latest_pre_region() -> None:
|
||||
# Arrange: three pre-hint regions; the LAST one is the relevant baseline.
|
||||
far = _region(500_000, 50.05, 36.0)
|
||||
close = _region(1_500_000, 50.005, 36.0)
|
||||
post = _region(3_000_000, 50.002, 36.0)
|
||||
regions = [far, close, post]
|
||||
|
||||
# Act
|
||||
report = gte.evaluate_search_region_shift(
|
||||
regions, hint_inject_timestamp_us=2_000_000, hint_lat_deg=50.0, hint_lon_deg=36.0
|
||||
)
|
||||
|
||||
# Assert
|
||||
assert report.region_before is close
|
||||
# The "before → after" delta must be measured against `close`, not `far`.
|
||||
expected_pre_dist = gte.haversine_distance_m(50.005, 36.0, 50.0, 36.0)
|
||||
assert report.distance_before_m == pytest.approx(expected_pre_dist, rel=1e-9)
|
||||
|
||||
|
||||
# ─────────────────── detect_hint_rejection ───────────────────
|
||||
|
||||
|
||||
def test_detect_hint_rejection_finds_bad_signature() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(2_500_000, "BAD_SIGNATURE on hint accept path")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 1
|
||||
assert not report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_ignores_pre_window_rejections() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(1_000_000, "BAD_SIGNATURE")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 0
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_ignores_post_window_rejections() -> None:
|
||||
# Arrange: window default 2_000_000 us → ends at 4_000_000 us.
|
||||
msgs = [_statustext(5_000_000, "REJECTED hint")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 0
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_passes_on_unrelated_statustext() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(2_500_000, "EKF position OK")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 0
|
||||
assert report.passes
|
||||
|
||||
|
||||
def test_detect_hint_rejection_is_case_insensitive() -> None:
|
||||
# Arrange
|
||||
msgs = [_statustext(2_500_000, "bad_signature on hint accept path")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_count == 1
|
||||
|
||||
|
||||
def test_detect_hint_rejection_records_full_text() -> None:
|
||||
# Arrange: rejection text is preserved with its original case for debugging.
|
||||
msgs = [_statustext(2_500_000, "UNAUTHORIZED hint from operator X")]
|
||||
|
||||
# Act
|
||||
report = gte.detect_hint_rejection(msgs, inject_timestamp_us=2_000_000)
|
||||
|
||||
# Assert
|
||||
assert report.rejection_texts == ("UNAUTHORIZED hint from operator X",)
|
||||
|
||||
|
||||
def test_detect_hint_rejection_rejects_non_positive_window() -> None:
|
||||
# Assert
|
||||
with pytest.raises(ValueError, match="window_us must be > 0"):
|
||||
gte.detect_hint_rejection([], inject_timestamp_us=0, window_us=0)
|
||||
|
||||
|
||||
# ─────────────────── collect_messages_to_list ───────────────────
|
||||
|
||||
|
||||
def test_collect_messages_to_list_materialises_iterator() -> None:
|
||||
# Arrange
|
||||
def _gen():
|
||||
yield _gpi(0)
|
||||
yield _gpi(1)
|
||||
|
||||
# Act
|
||||
materialised = gte.collect_messages_to_list(_gen())
|
||||
|
||||
# Assert
|
||||
assert len(materialised) == 2
|
||||
assert all(isinstance(m, TlogMessage) for m in materialised)
|
||||
@@ -473,6 +473,39 @@ def test_capture_ap_tlog_zero_duration_raises():
|
||||
so.capture_ap_tlog(host="x", duration_s=0)
|
||||
|
||||
|
||||
# capture_gcs_tlog
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_missing_env_raises(unset_replay_dir):
|
||||
# Assert
|
||||
with pytest.raises(RuntimeError, match="env var not set"):
|
||||
so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0)
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_missing_file_raises(replay_dir: Path):
|
||||
# Assert
|
||||
with pytest.raises(RuntimeError, match="fixture not found"):
|
||||
so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0)
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_returns_path(replay_dir: Path):
|
||||
# Arrange
|
||||
tlog = replay_dir / "gcs_tlog_sitl-ardupilot.tlog"
|
||||
tlog.write_bytes(b"\x00\x01\x02")
|
||||
|
||||
# Act
|
||||
out = so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0)
|
||||
|
||||
# Assert
|
||||
assert out == tlog
|
||||
|
||||
|
||||
def test_capture_gcs_tlog_zero_duration_raises():
|
||||
# Assert
|
||||
with pytest.raises(RuntimeError, match="duration_s must be positive"):
|
||||
so.capture_gcs_tlog(host="x", duration_s=0)
|
||||
|
||||
|
||||
# read_ap_parameter
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user