启动AWS多节点Ray集群并提交简单的python脚本在conda env中运行

问题描述

我在 ray 中发现的是缺乏自动缩放的文档,并且配置很容易被破坏,没有明确的原因。

我首先想将一个 docker 镜像拉到集群并运行它,但是 ray 处理 dockerimages 的方式与从远程机器拉出一个 docker 镜像并运行它的方式非常不同,即使在 docker 中使用基本的 rayproject 文件镜像文件。我放弃了这种方法

因此,我正在尝试另一种解决方案,即从 git 中提取我的管道,在 conda 环境中安装我的依赖项并运行我的管道,然后将我的 script.py 作业提交到集群。

我可以使用的唯一自动缩放示例是 minimum_cluster.yaml 配置,其中工作人员将显示为在 aws 上启动。但这本身并没有用,因为我需要在集群上安装大量依赖项才能运行更复杂的脚本。

集群名称:最小

initial_workers: 3
min_workers: 3
max_workers: 3

provider:
    type: aws
    region: eu-west-2

auth:
    ssh_user: ubuntu

head_node:
    InstanceType: c5.2xlarge
    ImageId: latest_dlami  # Default Ubuntu 16.04 AMI.

worker_nodes:
    InstanceType: c5.2xlarge
    ImageId: latest_dlami  # Default Ubuntu 16.04 AMI.

一旦我尝试增加复杂性,认设置就会被覆盖为手动,并且没有任何工作人员不会被初始化,尽管光线集群说它在终端中启动。 (运行 python 脚本,也不会启动工作线程)。

我想要的是启动一个集群,创建一个 conda env,在 conda env 上安装我的依赖项,以及一个在整个集群上运行的 python 脚本,其中工作人员将在我的 aws ec2 仪表板上实际显示为已初始化.

例如,像这样:

cluster_name: ray_cluster

min_workers: 8
max_workers: 8

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-east-2
    # availability_zone: us-west-2b

auth:
    ssh_user: ubuntu

head_node:
    InstanceType: c5.2xlarge
    ImageId: ami-07c1207a9d40bc3bd  # Default Ubuntu 16.04 AMI.

    # Set primary volume to 50 GiB
    BlockDeviceMappings:
        - DeviceName: /dev/sda1
          Ebs:
              VolumeSize: 50

worker_nodes:
    InstanceType: c4.2xlarge
    ImageId: ami-07c1207a9d40bc3bd  # Default Ubuntu 16.04 AMI.

    # Set primary volume to 50 GiB
    BlockDeviceMappings:
        - DeviceName: /dev/sda1
          Ebs:
              VolumeSize: 50



# List of shell commands to run to set up nodes.
setup_commands:
    # Consider uncommenting these if you run into dpkg locking issues
    # - sudo pkill -9 apt-get || true
    # - sudo pkill -9 dpkg || true
    # - sudo dpkg --configure -a
    # Install basics.
    - sudo apt-get update
    - sudo apt-get install -y build-essential
    - sudo apt-get install curl
    - sudo apt-get install unzip
    # Install Node.js in order to build the dashboard.
    - curl -sL https://deb.nodesource.com/setup_12.x | sudo -E bash
    - sudo apt-get install -y nodejs
    # Install Anaconda.
    - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true
    - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true
    - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
    # Build  env

    - git clone pipline
    
    - conda create --name ray_env
    - conda activate ray_env
    - conda install --name ray_env pip
    - pip install --upgrade pip
    - pip install ray[all]
    - conda env update -n ray_env --file conda_env.yaml
    - conda install xgboost

# Custom commands that will be run on the head node after common setup.
head_setup_commands: 
        - conda activate ray_env

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: 
        - conda activate ray_env
        
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379        
        
        
# If a node is idle for this many minutes,it will be removed.
idle_timeout_minutes: 5

我试图运行的脚本是这样的:

import os
import ray
import time
import sklearn
import xgboost
from xgboost.sklearn import XGBClassifier



def printer():
    print("INSIDE WORKER " + str(time.time()) +"  PID  :    "+  str(os.getpid()))


# decorators allow for futures to be created for parallelization
@ray.remote        
def func_1():
    model = XGBClassifier()
    count = 0
    for i in range(100000000):
        count += 1
    printer()
    return count
        
        
@ray.remote        
def func_2():
    #model = XGBClassifier()
    count = 0
    for i in range(100000000):
        count += 1
    printer()
    return count

    
@ray.remote
def func_3():
    count = 0
    for i in range(100000000):
        count += 1
    printer()
    return count

def main():
    model = XGBClassifier()

    start = time.time()
    results = []
    
    ray.init(address='auto')
    #append fuction futures
    for i in range(1000):
        results.append(func_1.remote())
        results.append(func_2.remote())
        results.append(func_3.remote())
        
    #run in parrallel and get aggregated list
    a = ray.get(results)
    b = 0
    
    #add all values in list together
    for j in range(len(a)):
        b += a[j]
    print(b)
    
    #time to complete
    end = time.time()
    print(end - start)
    
    
if __name__ == '__main__':
    main()

和做

ray submit cluster_minimal.yml ray_test.py -start -- --ray-address='xx.31.xx.xx:6379'

任何帮助,或任何人可以告诉我如何做到这一点,我将永远感激。一个可以运行的简单模板将非常有用。因为我尝试什么都不起作用。如果不是,也许我可能不得不转向 pyspark 或类似的东西,这将是可耻的,因为 wat ray 使用的装饰器和演员是一种非常好的做事方式。

解决方法

感谢您提出这个问题。您的反馈对我们非常重要。下次遇到问题时,请在我们的 github 存储库 (https://github.com/ray-project/ray/issues/new/choose) 上提交问题,并附上您看到的复制代码和输出,以便我们跟踪问题并且不会丢失。我们也很乐意改进自动调节程序文档,您能否提供更多信息,说明您想知道什么以及我们如何改进它?

对于您的问题,我复制粘贴了您的文件,并准确地运行了您每晚使用最新的 ray (https://docs.ray.io/en/master/installation.html#daily-releases-nightlies) 运行的内容。 唯一的区别是我跑了: “ray submit cluster_minimal.yml ray_test.py --start”(没有光线地址,开始时有两个破折号,不确定在集群启动之前提供光线地址是什么意思)。

Ray 正在打印一个明显的错误:

        (9/18) git clone pipline
    fatal: repository 'pipline' does not exist
    Shared connection to 3.5.zz.yy closed.
      New status: update-failed
      !!!
      SSH command failed.
      !!!
      
      Failed to setup head node.

您似乎正在尝试调用 git clone pipeline,但我不确定您希望它做什么。您可以尝试每晚使用最新的 ray 并在此处发布您获得的输出是什么以及您使用的是哪个版本的 ray 吗?

,

我已经运行了以下命令并将 xgboost 安装到 conda env 到集群上,并且通过以下设置已经成功,因为在运行 ray_test.py 时在集群上找到了导入 xgboost,如下所示(它已被更改)。我按照 Ameer 的建议使用以下命令运行它,使用 ray,版本 1.0.0,python 3.6.12 :: Anaconda,Inc。因此,这回答了这一部分。

不做的是在 aws ec2--> 实例上启动工作程序。找不到工人。请有人建议为什么它不启动工人,而只启动头部?

$ ray submit cluster.yaml ray_test.py --start

这是更新的配置:

cluster_name: ray_cluster

min_workers: 3
max_workers: 3

# Cloud-provider specific configuration.
provider:
    type: aws
    region: eu-west-2
    # availability_zone: us-west-2b

auth:
    ssh_user: ubuntu

head_node:
    InstanceType: c5.2xlarge
    ImageId: latest_dlami  # Default Ubuntu 16.04 AMI.

worker_nodes:
    InstanceType: c5.2xlarge
    ImageId: latest_dlami  # Default Ubuntu 16.04 AMI.


# List of shell commands to run to set up nodes.
setup_commands:
    # Consider uncommenting these if you run into dpkg locking issues
#    - sudo pkill -9 apt-get || true
#    - sudo pkill -9 dpkg || true
#    - sudo dpkg --configure -a
    # Install basics.
    - sudo apt-get update
    - sudo apt-get install -y build-essential
    - sudo apt-get install curl
    - sudo apt-get install unzip
    # Install Node.js in order to build the dashboard.
    - curl -sL https://deb.nodesource.com/setup_12.x | sudo -E bash
    - sudo apt-get install -y nodejs
    # Install Anaconda.
    - wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true
    - bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true
    - echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
    # Build  env

    - conda create --name ray_env
    - conda activate ray_env
    - conda install --name ray_env pip
    - pip install --upgrade pip
    - pip install ray[all]
    - conda install -c conda-forge xgboost


# Custom commands that will be run on the head node after common setup.
head_setup_commands:
    - source activate ray_env

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands:
    - source activate ray_env

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379


# If a node is idle for this many minutes,it will be removed.
idle_timeout_minutes: 5

这是更新后的 ray_test.py:

#This imports successfully with config and conda env
import ray
import time
import xgboost




def printer():
    print("INSIDE WORKER " + str(time.time()) +"  PID  :    "+  str(os.getpid()))


# decorators allow for futures to be created for parallelization
@ray.remote        
def func_1():
    count = 0
    for i in range(100000000):
        count += 1
    printer()
    return count
        
        
@ray.remote        
def func_2():
    count = 0
    for i in range(100000000):
        count += 1
    printer()
    return count

    
@ray.remote
def func_3():
    count = 0
    for i in range(100000000):
        count += 1
    printer()
    return count

def main():
    start = time.time()
    results = []
    
    ray.init(address='auto')
    #append fuction futures
    for i in range(1000):
        results.append(func_1.remote())
        results.append(func_2.remote())
        results.append(func_3.remote())
        
    #run in parrallel and get aggregated list
    a = ray.get(results)
    b = 0
    
    #add all values in list together
    for j in range(len(a)):
        b += a[j]
    print(b)
    
    #time to complete
    end = time.time()
    print(end - start)
    
    
if __name__ == '__main__':
    main()