在单台计算机上使用pyspark设置任务插槽

问题描述

我正在尝试使用SparkTrials库中的hyperopt运行ML模型的优化。我在具有16个内核的单台计算机上运行此命令,但是当我运行以下将内核数设置为8的代码时,我收到一条警告,似乎表明仅使用了一个内核。

SparkTrials接受一个参数spark_session作为参数,从理论上讲,这是我设置内核数的地方。

有人可以帮助我吗?

谢谢!

import os,shutil,tempfile
from hyperopt import fmin,tpe,hp,SparkTrials,STATUS_OK
import numpy as np
from sklearn import linear_model,datasets,model_selection
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").config('spark.local.dir','./').config("spark.executor.cores",8).getOrCreate()

def gen_data(bytes):
  """
  Generates train/test data with target total bytes for a random regression problem.
  Returns (X_train,X_test,y_train,y_test).
  """
  n_features = 100
  n_samples = int(1.0 * bytes / (n_features + 1) / 8)
  X,y = datasets.make_regression(n_samples=n_samples,n_features=n_features,random_state=0)
  return model_selection.train_test_split(X,y,test_size=0.2,random_state=1)

def train_and_eval(data,alpha):
  """
  Trains a LASSO model using training data with the input alpha and evaluates it using test data.
  """
  X_train,y_test = data  
  model = linear_model.Lasso(alpha=alpha)
  model.fit(X_train,y_train)
  loss = model.score(X_test,y_test)
  return {"loss": loss,"status": STATUS_OK}

def tune_alpha(objective):
  """
  Uses Hyperopt's SparkTrials to tune the input objective,which takes alpha as input and returns loss.
  Returns the best alpha found.
  """
  best = fmin(
    fn=objective,space=hp.uniform("alpha",0.0,10.0),algo=tpe.suggest,max_evals=8,trials=SparkTrials(parallelism=8,spark_session=spark))
  return best["alpha"]

data_small = gen_data(10 * 1024 * 1024)  # ~10MB

def objective_small(alpha):
  # For small data,you might reference it directly.
  return train_and_eval(data_small,alpha)

tune_alpha(objective_small)

并行性(8)大于当前Spark任务槽的总数 (1)。如果启用了动态分配,则您可能会看到更多执行者 已分配。

解决方法

如果您在集群中: Spark命名法中的核心与CPU中的物理核心无关,这里spark.executor.cores您指定了每个执行器的最大线程数(=任务)如果您想增加必须在命令行中使用--num-executors或在代码中使用spark.executor.instances配置属性的执行程序的数量(此处有一个),则可以运行8个。

如果您在纱线簇中,我建议尝试使用这种配置

spark.conf.set("spark.dynamicAllocation.enabled","true")
spark.conf.set("spark.executor.cores",4)
spark.conf.set("spark.dynamicAllocation.minExecutors","2")
spark.conf.set("spark.dynamicAllocation.maxExecutors","10")

请考虑以上选项在本地模式下不可用

本地:在本地模式下,您只有一个执行程序,并且如果要更改其工作线程数(默认情况下为一个),则必须像这样{{1 }}或local[*]

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...