Files
gps-denied-onboard/tests/blackbox/test_blackout_spoofing.py
T
Oleksandr Bezdieniezhnykh 5acd14b792 [AZ-234] [AZ-235] [AZ-236] [AZ-237] Add replay tests
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-05 06:24:10 +03:00

118 lines
3.8 KiB
Python

from e2e.replay.harness import mavlink_source_is_authorized
from mavlink_gcs_integration import InMemoryMavlinkGateway, OperatorStatusMessage
from safety_anchor_wrapper import SafetyAnchorStateMachine, SafetyStateConfig, TelemetryContext
from shared.contracts import VioStatePacket
def test_blackout_trace_transitions_to_dead_reckoned_then_no_fix() -> None:
# Arrange
state_machine = SafetyAnchorStateMachine(
SafetyStateConfig(
initial_covariance_m=2.0,
dead_reckoning_growth_m=125.0,
no_fix_covariance_threshold_m=500.0,
)
)
state_machine.update_vio(
VioStatePacket(
timestamp_ns=1_000_000_000,
relative_pose={"x_m": 0.0},
velocity_mps=(0.0, 0.0, 0.0),
tracking_quality=0.9,
covariance_hint=[[2.0, 0.0], [0.0, 2.0]],
),
TelemetryContext(
timestamp_ns=1_000_000_000,
latitude_hint_deg=48.0,
longitude_hint_deg=37.0,
altitude_m=400.0,
),
)
# Act
snapshots = tuple(
state_machine.propagate_blackout(1_000_000_000 + index * 1_000_000_000)
for index in range(1, 6)
)
# Assert
assert snapshots[0].mode == "dead_reckoned"
assert snapshots[-1].mode == "no_fix"
covariances = tuple(snapshot.estimate.covariance_semimajor_m for snapshot in snapshots)
assert covariances == tuple(sorted(covariances))
assert snapshots[-1].estimate.fix_type == 0
assert snapshots[-1].estimate.horizontal_accuracy_m >= 999.0
def test_no_fix_estimate_is_not_emitted_as_confident_gps_input() -> None:
# Arrange
state_machine = SafetyAnchorStateMachine(
SafetyStateConfig(dead_reckoning_growth_m=600.0, no_fix_covariance_threshold_m=500.0)
)
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000_000_000)
state_machine.update_vio(
VioStatePacket(
timestamp_ns=1,
relative_pose={"x_m": 0.0},
velocity_mps=(0.0, 0.0, 0.0),
tracking_quality=0.5,
),
TelemetryContext(
timestamp_ns=1,
latitude_hint_deg=48.0,
longitude_hint_deg=37.0,
altitude_m=400.0,
),
)
no_fix_snapshot = state_machine.propagate_blackout(2)
# Act
emission = gateway.emit_gps_input(no_fix_snapshot.estimate)
# Assert
assert emission.emitted is False
assert emission.error is not None
assert "unsafe for GPS_INPUT" in emission.error.message
def test_unauthorized_mavlink_sources_are_rejected_by_test_assertion() -> None:
# Arrange
allowed_source_system_ids = {1, 42}
# Act / Assert
assert mavlink_source_is_authorized(42, allowed_source_system_ids) is True
assert mavlink_source_is_authorized(99, allowed_source_system_ids) is False
def test_qgc_status_and_fdr_evidence_are_visible_and_rate_limited() -> None:
# Arrange
gateway = InMemoryMavlinkGateway(status_rate_limit_ns=2_000_000_000)
messages = [
OperatorStatusMessage(
timestamp_ns=1_000_000_000,
severity="warning",
text="VISUAL_BLACKOUT_IMU_ONLY",
),
OperatorStatusMessage(
timestamp_ns=2_000_000_000,
severity="warning",
text="VISUAL_BLACKOUT_IMU_ONLY",
),
OperatorStatusMessage(
timestamp_ns=4_000_000_000,
severity="critical",
text="VISUAL_BLACKOUT_FAILSAFE",
),
]
# Act
result = gateway.emit_status(messages)
# Assert
assert [message.text for message in result.emitted] == [
"VISUAL_BLACKOUT_IMU_ONLY",
"VISUAL_BLACKOUT_FAILSAFE",
]
assert len(result.suppressed) == 1
assert all(message.visible_to_qgc for message in result.emitted)