问题描述
我正在尝试使用SparkTrials
库中的hyperopt
运行ML模型的优化。我在具有16个内核的单台计算机上运行此命令,但是当我运行以下将内核数设置为8的代码时,我收到一条警告,似乎表明仅使用了一个内核。
SparkTrials接受一个参数spark_session
作为参数,从理论上讲,这是我设置内核数的地方。
有人可以帮助我吗?
谢谢!
import os,shutil,tempfile
from hyperopt import fmin,tpe,hp,SparkTrials,STATUS_OK
import numpy as np
from sklearn import linear_model,datasets,model_selection
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").config('spark.local.dir','./').config("spark.executor.cores",8).getOrCreate()
def gen_data(bytes):
"""
Generates train/test data with target total bytes for a random regression problem.
Returns (X_train,X_test,y_train,y_test).
"""
n_features = 100
n_samples = int(1.0 * bytes / (n_features + 1) / 8)
X,y = datasets.make_regression(n_samples=n_samples,n_features=n_features,random_state=0)
return model_selection.train_test_split(X,y,test_size=0.2,random_state=1)
def train_and_eval(data,alpha):
"""
Trains a LASSO model using training data with the input alpha and evaluates it using test data.
"""
X_train,y_test = data
model = linear_model.Lasso(alpha=alpha)
model.fit(X_train,y_train)
loss = model.score(X_test,y_test)
return {"loss": loss,"status": STATUS_OK}
def tune_alpha(objective):
"""
Uses Hyperopt's SparkTrials to tune the input objective,which takes alpha as input and returns loss.
Returns the best alpha found.
"""
best = fmin(
fn=objective,space=hp.uniform("alpha",0.0,10.0),algo=tpe.suggest,max_evals=8,trials=SparkTrials(parallelism=8,spark_session=spark))
return best["alpha"]
data_small = gen_data(10 * 1024 * 1024) # ~10MB
def objective_small(alpha):
# For small data,you might reference it directly.
return train_and_eval(data_small,alpha)
tune_alpha(objective_small)
并行性(8)大于当前Spark任务槽的总数 (1)。如果启用了动态分配,则您可能会看到更多执行者 已分配。
解决方法
如果您在集群中: Spark命名法中的核心与CPU中的物理核心无关,这里spark.executor.cores
您指定了每个执行器的最大线程数(=任务)如果您想增加必须在命令行中使用--num-executors
或在代码中使用spark.executor.instances
配置属性的执行程序的数量(此处有一个),则可以运行8个。
如果您在纱线簇中,我建议尝试使用这种配置
spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("spark.executor.cores",4)
spark.conf.set("spark.dynamicAllocation.minExecutors","2")
spark.conf.set("spark.dynamicAllocation.maxExecutors","10")
请考虑以上选项在本地模式下不可用
本地:在本地模式下,您只有一个执行程序,并且如果要更改其工作线程数(默认情况下为一个),则必须像这样{{1 }}或local[*]