from e2e.replay.harness import mavlink_source_is_authorized from mavlink_gcs_integration import InMemoryMavlinkGateway, OperatorStatusMessage from safety_anchor_wrapper import SafetyAnchorStateMachine, SafetyStateConfig, TelemetryContext from shared.contracts import VioStatePacket def test_blackout_trace_transitions_to_dead_reckoned_then_no_fix() -> None: # Arrange state_machine = SafetyAnchorStateMachine( SafetyStateConfig( initial_covariance_m=2.0, dead_reckoning_growth_m=125.0, no_fix_covariance_threshold_m=500.0, ) ) state_machine.update_vio( VioStatePacket( timestamp_ns=1_000_000_000, relative_pose={"x_m": 0.0}, velocity_mps=(0.0, 0.0, 0.0), tracking_quality=0.9, covariance_hint=[[2.0, 0.0], [0.0, 2.0]], ), TelemetryContext( timestamp_ns=1_000_000_000, latitude_hint_deg=48.0, longitude_hint_deg=37.0, altitude_m=400.0, ), ) # Act snapshots = tuple( state_machine.propagate_blackout(1_000_000_000 + index * 1_000_000_000) for index in range(1, 6) ) # Assert assert snapshots[0].mode == "dead_reckoned" assert snapshots[-1].mode == "no_fix" covariances = tuple(snapshot.estimate.covariance_semimajor_m for snapshot in snapshots) assert covariances == tuple(sorted(covariances)) assert snapshots[-1].estimate.fix_type == 0 assert snapshots[-1].estimate.horizontal_accuracy_m >= 999.0 def test_no_fix_estimate_is_not_emitted_as_confident_gps_input() -> None: # Arrange state_machine = SafetyAnchorStateMachine( SafetyStateConfig(dead_reckoning_growth_m=600.0, no_fix_covariance_threshold_m=500.0) ) gateway = InMemoryMavlinkGateway(status_rate_limit_ns=1_000_000_000) state_machine.update_vio( VioStatePacket( timestamp_ns=1, relative_pose={"x_m": 0.0}, velocity_mps=(0.0, 0.0, 0.0), tracking_quality=0.5, ), TelemetryContext( timestamp_ns=1, latitude_hint_deg=48.0, longitude_hint_deg=37.0, altitude_m=400.0, ), ) no_fix_snapshot = state_machine.propagate_blackout(2) # Act emission = gateway.emit_gps_input(no_fix_snapshot.estimate) # Assert assert emission.emitted is False assert emission.error is not None assert "unsafe for GPS_INPUT" in emission.error.message def test_unauthorized_mavlink_sources_are_rejected_by_test_assertion() -> None: # Arrange allowed_source_system_ids = {1, 42} # Act / Assert assert mavlink_source_is_authorized(42, allowed_source_system_ids) is True assert mavlink_source_is_authorized(99, allowed_source_system_ids) is False def test_qgc_status_and_fdr_evidence_are_visible_and_rate_limited() -> None: # Arrange gateway = InMemoryMavlinkGateway(status_rate_limit_ns=2_000_000_000) messages = [ OperatorStatusMessage( timestamp_ns=1_000_000_000, severity="warning", text="VISUAL_BLACKOUT_IMU_ONLY", ), OperatorStatusMessage( timestamp_ns=2_000_000_000, severity="warning", text="VISUAL_BLACKOUT_IMU_ONLY", ), OperatorStatusMessage( timestamp_ns=4_000_000_000, severity="critical", text="VISUAL_BLACKOUT_FAILSAFE", ), ] # Act result = gateway.emit_status(messages) # Assert assert [message.text for message in result.emitted] == [ "VISUAL_BLACKOUT_IMU_ONLY", "VISUAL_BLACKOUT_FAILSAFE", ] assert len(result.suppressed) == 1 assert all(message.visible_to_qgc for message in result.emitted)