"""Unit tests for `e2e/runner/helpers/sitl_observer.py` (AZ-595).""" from __future__ import annotations import json import os from pathlib import Path import pytest from e2e.runner.helpers import sitl_observer as so @pytest.fixture def replay_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: """Sets `${E2E_SITL_REPLAY_DIR}` to a tmp dir for the duration of the test.""" monkeypatch.setenv("E2E_SITL_REPLAY_DIR", str(tmp_path)) return tmp_path @pytest.fixture def unset_replay_dir(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("E2E_SITL_REPLAY_DIR", raising=False) def _write_json(path: Path, content) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(content)) # replay_dir / replay_dir_available def test_replay_dir_unset_returns_none(unset_replay_dir): # Assert assert so.replay_dir() is None assert so.replay_dir_available() is False def test_replay_dir_set_but_missing_returns_false(monkeypatch: pytest.MonkeyPatch, tmp_path: Path): # Arrange monkeypatch.setenv("E2E_SITL_REPLAY_DIR", str(tmp_path / "nope")) # Assert assert so.replay_dir_available() is False def test_replay_dir_set_and_exists_returns_true(replay_dir: Path): # Assert assert so.replay_dir_available() is True def test_replay_dir_whitespace_env_treated_as_unset(monkeypatch: pytest.MonkeyPatch): # Arrange monkeypatch.setenv("E2E_SITL_REPLAY_DIR", " ") # Assert assert so.replay_dir() is None # read_ekf_divergence_events def test_read_ekf_events_empty_without_env(unset_replay_dir): # Assert assert so.read_ekf_divergence_events() == [] def test_read_ekf_events_empty_when_file_missing(replay_dir: Path): # Assert assert so.read_ekf_divergence_events() == [] def test_read_ekf_events_parses_records(replay_dir: Path): # Arrange _write_json( replay_dir / "ekf_divergence_events.json", [ {"monotonic_ms": 1000, "severity": "WARNING", "message": "EKF_X drift"}, {"monotonic_ms": 2000, "severity": "CRITICAL", "message": "EKF_X reset"}, ], ) # Act events = so.read_ekf_divergence_events() # Assert assert len(events) == 2 assert events[0] == so.EkfDivergenceEvent( monotonic_ms=1000, severity="WARNING", message="EKF_X drift" ) def test_read_ekf_events_malformed_raises(replay_dir: Path): # Arrange _write_json(replay_dir / "ekf_divergence_events.json", [{"monotonic_ms": "bad"}]) # Act / Assert with pytest.raises(RuntimeError, match="EKF divergence fixture malformed"): so.read_ekf_divergence_events() def test_read_ekf_events_wrong_top_level_type_raises(replay_dir: Path): # Arrange _write_json(replay_dir / "ekf_divergence_events.json", {"not": "a list"}) # Act / Assert with pytest.raises(RuntimeError, match="must be a JSON list"): so.read_ekf_divergence_events() # read_gps_health_samples def test_read_gps_health_parses(replay_dir: Path): # Arrange _write_json( replay_dir / "gps_health_samples.json", [ {"monotonic_ms": 0, "healthy": True, "spoofed": False}, {"monotonic_ms": 1000, "healthy": False, "spoofed": True}, ], ) # Act samples = so.read_gps_health_samples() # Assert assert len(samples) == 2 assert samples[1].spoofed is True def test_read_gps_health_empty_without_env(unset_replay_dir): # Assert assert so.read_gps_health_samples() == [] # read_consistency_check_events def test_read_consistency_check_parses(replay_dir: Path): # Arrange _write_json( replay_dir / "consistency_check_events.json", [{"monotonic_ms": 5000, "passed": True}], ) # Act events = so.read_consistency_check_events() # Assert assert events == [so.ConsistencyCheckEvent(monotonic_ms=5000, passed=True)] def test_read_consistency_check_empty_without_env(unset_replay_dir): # Assert assert so.read_consistency_check_events() == [] # get_observer def test_get_observer_missing_env_raises(unset_replay_dir): # Assert with pytest.raises(RuntimeError, match="env var not set"): so.get_observer("ardupilot", "sitl-host") def test_get_observer_missing_fixture_raises(replay_dir: Path): # Assert with pytest.raises(RuntimeError, match="required fixture not found"): so.get_observer("ardupilot", "sitl-host") def test_get_observer_read_gps_state(replay_dir: Path): # Arrange _write_json( replay_dir / "observer_ardupilot_sitl-host.json", { "gps_state": { "primary_source": "MAV", "last_position_lat_deg": 50.0, "last_position_lon_deg": 30.0, "last_position_alt_m": 250.0, "fix_quality": 3, "horizontal_accuracy_m": 1.5, "last_update_age_ms": 100, }, "parameters": {"EK3_SRC1_POSXY": 3}, }, ) # Act obs = so.get_observer("ardupilot", "sitl-host") gps = obs.read_gps_state() # Assert assert gps.primary_source == "MAV" assert gps.fix_quality == 3 assert obs.read_parameter("EK3_SRC1_POSXY") == 3 assert obs.read_parameter("MISSING") is None def test_get_observer_missing_gps_state_raises(replay_dir: Path): # Arrange _write_json(replay_dir / "observer_inav_h.json", {"parameters": {}}) # Act / Assert obs = so.get_observer("inav", "h") with pytest.raises(RuntimeError, match="fixture missing `gps_state`"): obs.read_gps_state() # wait_for_outbound (AZ-598) def _write_observer_fixture(replay_dir: Path, fc_kind: str, host: str) -> None: """Write the minimal `observer__.json` so `get_observer` succeeds.""" _write_json( replay_dir / f"observer_{fc_kind}_{host}.json", { "gps_state": { "primary_source": "MAV", "last_position_lat_deg": 0.0, "last_position_lon_deg": 0.0, "last_position_alt_m": 0.0, "fix_quality": 3, "horizontal_accuracy_m": 1.0, "last_update_age_ms": 0, }, "parameters": {}, }, ) def test_wait_for_outbound_advances_cursor_in_order(replay_dir: Path): # Arrange _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", { "messages": [ {"image_id": "AD000001.jpg", "lat_deg": 48.275292, "lon_deg": 37.385220}, {"image_id": "AD000002.jpg", "lat_deg": 48.275001, "lon_deg": 37.382922}, ] }, ) obs = so.get_observer("ardupilot", "sitl-host") # Act first = obs.wait_for_outbound(timeout_s=5.0) second = obs.wait_for_outbound(timeout_s=5.0) # Assert assert first.lat_deg == 48.275292 and first.lon_deg == 37.385220 assert first.image_id == "AD000001.jpg" assert second.lat_deg == 48.275001 and second.lon_deg == 37.382922 assert second.image_id == "AD000002.jpg" def test_wait_for_outbound_null_entry_raises_timeout(replay_dir: Path): # Arrange _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": [None]}, ) obs = so.get_observer("ardupilot", "sitl-host") # Assert with pytest.raises(TimeoutError, match="captured as timeout in fixture"): obs.wait_for_outbound(timeout_s=5.0) def test_wait_for_outbound_advances_cursor_past_timeout(replay_dir: Path): # Arrange — a real timeout in the middle of the sequence does not stall # the cursor; the next call advances normally. _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", { "messages": [ {"lat_deg": 1.0, "lon_deg": 2.0}, None, {"lat_deg": 3.0, "lon_deg": 4.0}, ] }, ) obs = so.get_observer("ardupilot", "sitl-host") # Act / Assert assert obs.wait_for_outbound().lat_deg == 1.0 with pytest.raises(TimeoutError): obs.wait_for_outbound() third = obs.wait_for_outbound() assert third.lat_deg == 3.0 and third.lon_deg == 4.0 def test_wait_for_outbound_exhausted_raises_runtime(replay_dir: Path): # Arrange _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": [{"lat_deg": 1.0, "lon_deg": 2.0}]}, ) obs = so.get_observer("ardupilot", "sitl-host") obs.wait_for_outbound() # drain the only entry # Assert with pytest.raises(RuntimeError, match="outbound messages fixture exhausted"): obs.wait_for_outbound() def test_wait_for_outbound_missing_fixture_raises_runtime(replay_dir: Path): # Arrange — observer fixture present, outbound fixture missing. _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") obs = so.get_observer("ardupilot", "sitl-host") # Assert with pytest.raises(RuntimeError, match="outbound_messages_ardupilot_sitl-host.json"): obs.wait_for_outbound() def test_wait_for_outbound_missing_env_raises_runtime(unset_replay_dir): # Arrange — observer dataclass constructed manually so we don't depend on env var # for the observer-fixture load. Verifies the outbound load itself respects the env. obs = so._FdrReplayObserver(fc_kind="ardupilot", host="sitl-host", _payload={}) # Assert with pytest.raises(RuntimeError, match="env var not set"): obs.wait_for_outbound() def test_wait_for_outbound_messages_not_list_raises_runtime(replay_dir: Path): # Arrange _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": {"oops": "should be list"}}, ) obs = so.get_observer("ardupilot", "sitl-host") # Assert with pytest.raises(RuntimeError, match="`messages` must be a JSON list"): obs.wait_for_outbound() def test_wait_for_outbound_entry_wrong_type_raises_runtime(replay_dir: Path): # Arrange _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": ["not-an-object"]}, ) obs = so.get_observer("ardupilot", "sitl-host") # Assert with pytest.raises(RuntimeError, match=r"messages\[0\] must be a JSON object or null"): obs.wait_for_outbound() def test_wait_for_outbound_entry_missing_coords_raises_runtime(replay_dir: Path): # Arrange _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": [{"image_id": "AD000001.jpg"}]}, ) obs = so.get_observer("ardupilot", "sitl-host") # Assert with pytest.raises(RuntimeError, match="missing required `lat_deg`/`lon_deg`"): obs.wait_for_outbound() def test_wait_for_outbound_image_id_optional(replay_dir: Path): # Arrange — entries without `image_id` are valid; consumer only needs coords. _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": [{"lat_deg": 10.0, "lon_deg": 20.0}]}, ) obs = so.get_observer("ardupilot", "sitl-host") # Act msg = obs.wait_for_outbound() # Assert assert msg.lat_deg == 10.0 and msg.lon_deg == 20.0 assert msg.image_id is None def test_wait_for_outbound_separate_observers_have_independent_cursors(replay_dir: Path): # Arrange — two observers built from the same fixture file must NOT share cursor. _write_observer_fixture(replay_dir, "ardupilot", "sitl-host") _write_json( replay_dir / "outbound_messages_ardupilot_sitl-host.json", {"messages": [{"lat_deg": 1.0, "lon_deg": 2.0}, {"lat_deg": 3.0, "lon_deg": 4.0}]}, ) # Act obs_a = so.get_observer("ardupilot", "sitl-host") obs_b = so.get_observer("ardupilot", "sitl-host") a_first = obs_a.wait_for_outbound() b_first = obs_b.wait_for_outbound() # Assert assert a_first.lat_deg == 1.0 assert b_first.lat_deg == 1.0 # prepare_sitl_* def test_prepare_sitl_cold_boot_no_op(tmp_path: Path): # Act — no env var set is fine for the no-op. so.prepare_sitl_cold_boot(host="ardupilot-sitl", fixture_path=tmp_path / "cb.json") def test_prepare_sitl_cold_boot_empty_host_raises(tmp_path: Path): # Assert with pytest.raises(RuntimeError, match="host must be non-empty"): so.prepare_sitl_cold_boot(host="", fixture_path=tmp_path / "cb.json") def test_prepare_sitl_cold_boot_none_fixture_path_raises(): # Assert with pytest.raises(RuntimeError, match="fixture_path is required"): so.prepare_sitl_cold_boot(host="ardupilot-sitl", fixture_path=None) # type: ignore[arg-type] def test_prepare_sitl_no_gps_no_op(): # Act so.prepare_sitl_no_gps(host="ardupilot-sitl") def test_prepare_sitl_no_gps_empty_host_raises(): # Assert with pytest.raises(RuntimeError, match="host must be non-empty"): so.prepare_sitl_no_gps(host="") # capture_ap_tlog def test_capture_ap_tlog_missing_env_raises(unset_replay_dir): # Assert with pytest.raises(RuntimeError, match="env var not set"): so.capture_ap_tlog(host="ardupilot-sitl", duration_s=1.0) def test_capture_ap_tlog_missing_file_raises(replay_dir: Path): # Assert with pytest.raises(RuntimeError, match="fixture not found"): so.capture_ap_tlog(host="ardupilot-sitl", duration_s=1.0) def test_capture_ap_tlog_returns_path(replay_dir: Path): # Arrange tlog = replay_dir / "ap_tlog_ardupilot-sitl.tlog" tlog.write_bytes(b"\x00\x01\x02") # Act out = so.capture_ap_tlog(host="ardupilot-sitl", duration_s=1.0) # Assert assert out == tlog def test_capture_ap_tlog_zero_duration_raises(): # Assert with pytest.raises(RuntimeError, match="duration_s must be positive"): so.capture_ap_tlog(host="x", duration_s=0) # capture_gcs_tlog def test_capture_gcs_tlog_missing_env_raises(unset_replay_dir): # Assert with pytest.raises(RuntimeError, match="env var not set"): so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0) def test_capture_gcs_tlog_missing_file_raises(replay_dir: Path): # Assert with pytest.raises(RuntimeError, match="fixture not found"): so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0) def test_capture_gcs_tlog_returns_path(replay_dir: Path): # Arrange tlog = replay_dir / "gcs_tlog_sitl-ardupilot.tlog" tlog.write_bytes(b"\x00\x01\x02") # Act out = so.capture_gcs_tlog(host="sitl-ardupilot", duration_s=1.0) # Assert assert out == tlog def test_capture_gcs_tlog_zero_duration_raises(): # Assert with pytest.raises(RuntimeError, match="duration_s must be positive"): so.capture_gcs_tlog(host="x", duration_s=0) # read_ap_parameter def test_read_ap_parameter_returns_value(replay_dir: Path): # Arrange _write_json( replay_dir / "ap_parameters_ardupilot-sitl.json", {"EK3_SRC1_POSXY": 3, "GPS_TYPE": 14}, ) # Act + Assert assert so.read_ap_parameter(host="ardupilot-sitl", name="EK3_SRC1_POSXY") == 3 assert so.read_ap_parameter(host="ardupilot-sitl", name="UNKNOWN") is None def test_read_ap_parameter_missing_file_raises(replay_dir: Path): # Assert with pytest.raises(RuntimeError, match="required fixture not found"): so.read_ap_parameter(host="ardupilot-sitl", name="ANY") # observe_inav_tcp_handshake def test_observe_inav_tcp_handshake_returns_record(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_handshake_inav-sitl_5760.json", {"established_within_s": 2.1}, ) # Act report = so.observe_inav_tcp_handshake(host="inav-sitl", port=5760, timeout_s=5.0) # Assert assert report.established_within_s == pytest.approx(2.1) def test_observe_inav_tcp_handshake_null_established(replay_dir: Path): # Arrange — handshake did NOT establish within window. _write_json( replay_dir / "inav_handshake_inav-sitl_5760.json", {"established_within_s": None}, ) # Act report = so.observe_inav_tcp_handshake(host="inav-sitl", port=5760, timeout_s=5.0) # Assert assert report.established_within_s is None def test_observe_inav_tcp_handshake_zero_timeout_raises(): # Assert with pytest.raises(RuntimeError, match="timeout_s must be positive"): so.observe_inav_tcp_handshake(host="x", port=1, timeout_s=0) def test_observe_inav_tcp_handshake_bad_value_type_raises(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_handshake_inav-sitl_5760.json", {"established_within_s": "not-a-number"}, ) # Act / Assert with pytest.raises(RuntimeError, match="must be a number or null"): so.observe_inav_tcp_handshake(host="inav-sitl", port=5760, timeout_s=5.0) # collect_inav_msp_frames def test_collect_inav_msp_frames_round_trip(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_msp_frames_inav-sitl_5760.json", { "frames": [ {"monotonic_ms": 0, "function_id": 0x1F03}, {"monotonic_ms": 200, "function_id": 0x1F03}, ], "expected_num_sat": 12, }, ) # Act capture = so.collect_inav_msp_frames(host="inav-sitl", port=5760, window_s=60.0) # Assert assert capture.expected_num_sat == 12 assert len(capture.frames) == 2 assert capture.frames[1].function_id == 0x1F03 def test_collect_inav_msp_frames_missing_expected_num_sat_raises(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_msp_frames_inav-sitl_5760.json", {"frames": []}, ) # Act / Assert with pytest.raises(RuntimeError, match="`expected_num_sat` must be an int"): so.collect_inav_msp_frames(host="inav-sitl", port=5760, window_s=60.0) def test_collect_inav_msp_frames_malformed_frame_raises(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_msp_frames_inav-sitl_5760.json", {"frames": [{"monotonic_ms": "bad"}], "expected_num_sat": 12}, ) # Act / Assert with pytest.raises(RuntimeError, match="malformed frame"): so.collect_inav_msp_frames(host="inav-sitl", port=5760, window_s=60.0) # query_inav_gps_state def test_query_inav_gps_state_round_trip(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_gps_state_inav-sitl.json", {"fix_type": 3, "num_sat": 14, "provider": "MSP"}, ) # Act state = so.query_inav_gps_state(host="inav-sitl") # Assert assert state.fix_type == 3 assert state.num_sat == 14 assert state.provider == "MSP" def test_query_inav_gps_state_missing_field_raises(replay_dir: Path): # Arrange _write_json( replay_dir / "inav_gps_state_inav-sitl.json", {"fix_type": 3, "num_sat": 14}, ) # Act / Assert with pytest.raises(RuntimeError, match="iNav GPS state fixture"): so.query_inav_gps_state(host="inav-sitl") def test_query_inav_gps_state_missing_env_raises(unset_replay_dir): # Assert with pytest.raises(RuntimeError, match="env var not set"): so.query_inav_gps_state(host="inav-sitl")