问题描述
我在 ray 中发现的是缺乏自动缩放的文档,并且配置很容易被破坏,没有明确的原因。
我首先想将一个 docker 镜像拉到集群并运行它,但是 ray 处理 dockerimages 的方式与从远程机器拉出一个 docker 镜像并运行它的方式非常不同,即使在 docker 中使用基本的 rayproject 文件镜像文件。我放弃了这种方法。
因此,我正在尝试另一种解决方案,即从 git 中提取我的管道,在 conda 环境中安装我的依赖项并运行我的管道,然后将我的 script.py 作业提交到集群。
我可以使用的唯一自动缩放示例是 minimum_cluster.yaml 配置,其中工作人员将显示为在 aws 上启动。但这本身并没有用,因为我需要在集群上安装大量依赖项才能运行更复杂的脚本。
集群名称:最小
initial_workers: 3
min_workers: 3
max_workers: 3
provider:
type: aws
region: eu-west-2
auth:
ssh_user: ubuntu
head_node:
InstanceType: c5.2xlarge
ImageId: latest_dlami # Default Ubuntu 16.04 AMI.
worker_nodes:
InstanceType: c5.2xlarge
ImageId: latest_dlami # Default Ubuntu 16.04 AMI.
一旦我尝试增加复杂性,默认设置就会被覆盖为手动,并且没有任何工作人员不会被初始化,尽管光线集群说它在终端中启动。 (运行 python 脚本,也不会启动工作线程)。
我想要的是启动一个集群,创建一个 conda env,在 conda env 上安装我的依赖项,以及一个在整个集群上运行的 python 脚本,其中工作人员将在我的 aws ec2 仪表板上实际显示为已初始化.
例如,像这样:
cluster_name: ray_cluster
min_workers: 8
max_workers: 8
# Cloud-provider specific configuration.
provider:
type: aws
region: us-east-2
# availability_zone: us-west-2b
auth:
ssh_user: ubuntu
head_node:
InstanceType: c5.2xlarge
ImageId: ami-07c1207a9d40bc3bd # Default Ubuntu 16.04 AMI.
# Set primary volume to 50 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 50
worker_nodes:
InstanceType: c4.2xlarge
ImageId: ami-07c1207a9d40bc3bd # Default Ubuntu 16.04 AMI.
# Set primary volume to 50 GiB
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 50
# List of shell commands to run to set up nodes.
setup_commands:
# Consider uncommenting these if you run into dpkg locking issues
# - sudo pkill -9 apt-get || true
# - sudo pkill -9 dpkg || true
# - sudo dpkg --configure -a
# Install basics.
- sudo apt-get update
- sudo apt-get install -y build-essential
- sudo apt-get install curl
- sudo apt-get install unzip
# Install Node.js in order to build the dashboard.
- curl -sL https://deb.nodesource.com/setup_12.x | sudo -E bash
- sudo apt-get install -y nodejs
# Install Anaconda.
- wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true
- bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true
- echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
# Build env
- git clone pipline
- conda create --name ray_env
- conda activate ray_env
- conda install --name ray_env pip
- pip install --upgrade pip
- pip install ray[all]
- conda env update -n ray_env --file conda_env.yaml
- conda install xgboost
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- conda activate ray_env
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands:
- conda activate ray_env
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379
# If a node is idle for this many minutes,it will be removed.
idle_timeout_minutes: 5
我试图运行的脚本是这样的:
import os
import ray
import time
import sklearn
import xgboost
from xgboost.sklearn import XGBClassifier
def printer():
print("INSIDE WORKER " + str(time.time()) +" PID : "+ str(os.getpid()))
# decorators allow for futures to be created for parallelization
@ray.remote
def func_1():
model = XGBClassifier()
count = 0
for i in range(100000000):
count += 1
printer()
return count
@ray.remote
def func_2():
#model = XGBClassifier()
count = 0
for i in range(100000000):
count += 1
printer()
return count
@ray.remote
def func_3():
count = 0
for i in range(100000000):
count += 1
printer()
return count
def main():
model = XGBClassifier()
start = time.time()
results = []
ray.init(address='auto')
#append fuction futures
for i in range(1000):
results.append(func_1.remote())
results.append(func_2.remote())
results.append(func_3.remote())
#run in parrallel and get aggregated list
a = ray.get(results)
b = 0
#add all values in list together
for j in range(len(a)):
b += a[j]
print(b)
#time to complete
end = time.time()
print(end - start)
if __name__ == '__main__':
main()
和做
ray submit cluster_minimal.yml ray_test.py -start -- --ray-address='xx.31.xx.xx:6379'
任何帮助,或任何人可以告诉我如何做到这一点,我将永远感激。一个可以运行的简单模板将非常有用。因为我尝试什么都不起作用。如果不是,也许我可能不得不转向 pyspark 或类似的东西,这将是可耻的,因为 wat ray 使用的装饰器和演员是一种非常好的做事方式。
解决方法
感谢您提出这个问题。您的反馈对我们非常重要。下次遇到问题时,请在我们的 github 存储库 (https://github.com/ray-project/ray/issues/new/choose) 上提交问题,并附上您看到的复制代码和输出,以便我们跟踪问题并且不会丢失。我们也很乐意改进自动调节程序文档,您能否提供更多信息,说明您想知道什么以及我们如何改进它?
对于您的问题,我复制粘贴了您的文件,并准确地运行了您每晚使用最新的 ray (https://docs.ray.io/en/master/installation.html#daily-releases-nightlies) 运行的内容。 唯一的区别是我跑了: “ray submit cluster_minimal.yml ray_test.py --start”(没有光线地址,开始时有两个破折号,不确定在集群启动之前提供光线地址是什么意思)。
Ray 正在打印一个明显的错误:
(9/18) git clone pipline
fatal: repository 'pipline' does not exist
Shared connection to 3.5.zz.yy closed.
New status: update-failed
!!!
SSH command failed.
!!!
Failed to setup head node.
您似乎正在尝试调用 git clone pipeline
,但我不确定您希望它做什么。您可以尝试每晚使用最新的 ray 并在此处发布您获得的输出是什么以及您使用的是哪个版本的 ray 吗?
我已经运行了以下命令并将 xgboost 安装到 conda env 到集群上,并且通过以下设置已经成功,因为在运行 ray_test.py 时在集群上找到了导入 xgboost,如下所示(它已被更改)。我按照 Ameer 的建议使用以下命令运行它,使用 ray,版本 1.0.0,python 3.6.12 :: Anaconda,Inc。因此,这回答了这一部分。
不做的是在 aws ec2--> 实例上启动工作程序。找不到工人。请有人建议为什么它不启动工人,而只启动头部?
$ ray submit cluster.yaml ray_test.py --start
这是更新的配置:
cluster_name: ray_cluster
min_workers: 3
max_workers: 3
# Cloud-provider specific configuration.
provider:
type: aws
region: eu-west-2
# availability_zone: us-west-2b
auth:
ssh_user: ubuntu
head_node:
InstanceType: c5.2xlarge
ImageId: latest_dlami # Default Ubuntu 16.04 AMI.
worker_nodes:
InstanceType: c5.2xlarge
ImageId: latest_dlami # Default Ubuntu 16.04 AMI.
# List of shell commands to run to set up nodes.
setup_commands:
# Consider uncommenting these if you run into dpkg locking issues
# - sudo pkill -9 apt-get || true
# - sudo pkill -9 dpkg || true
# - sudo dpkg --configure -a
# Install basics.
- sudo apt-get update
- sudo apt-get install -y build-essential
- sudo apt-get install curl
- sudo apt-get install unzip
# Install Node.js in order to build the dashboard.
- curl -sL https://deb.nodesource.com/setup_12.x | sudo -E bash
- sudo apt-get install -y nodejs
# Install Anaconda.
- wget https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh || true
- bash Anaconda3-5.0.1-Linux-x86_64.sh -b -p $HOME/anaconda3 || true
- echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.bashrc
# Build env
- conda create --name ray_env
- conda activate ray_env
- conda install --name ray_env pip
- pip install --upgrade pip
- pip install ray[all]
- conda install -c conda-forge xgboost
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- source activate ray_env
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands:
- source activate ray_env
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --head --port=6379 --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379
# If a node is idle for this many minutes,it will be removed.
idle_timeout_minutes: 5
这是更新后的 ray_test.py:
#This imports successfully with config and conda env
import ray
import time
import xgboost
def printer():
print("INSIDE WORKER " + str(time.time()) +" PID : "+ str(os.getpid()))
# decorators allow for futures to be created for parallelization
@ray.remote
def func_1():
count = 0
for i in range(100000000):
count += 1
printer()
return count
@ray.remote
def func_2():
count = 0
for i in range(100000000):
count += 1
printer()
return count
@ray.remote
def func_3():
count = 0
for i in range(100000000):
count += 1
printer()
return count
def main():
start = time.time()
results = []
ray.init(address='auto')
#append fuction futures
for i in range(1000):
results.append(func_1.remote())
results.append(func_2.remote())
results.append(func_3.remote())
#run in parrallel and get aggregated list
a = ray.get(results)
b = 0
#add all values in list together
for j in range(len(a)):
b += a[j]
print(b)
#time to complete
end = time.time()
print(end - start)
if __name__ == '__main__':
main()