问题描述
我在本地Windows 10上的Spark 3.0.1上使用Pyspark进行测试和开发,无论尝试什么,生成的进程数始终为200,这对于我的小型测试用例来说太多了。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark_test").master("local")\
.config('spark.shuffle.partitions','16')\
.config('spark.adaptive.enabled','True')\
.config("spark.adaptive.coalescepartitions.enabled","True").getorCreate()
稍后进行print(spark.sparkContext._conf.getAll())
表示参数已正确设置(主机由我检查):
[('spark.master','local'),('spark.driver.host','**************'),('spark.app.name','pyspark_test'),('spark.adaptive.enabled','True'),('spark.rdd.compress',('spark.adaptive.coalescepartitions.enabled',('spark.driver.port','58352'),('spark.serializer.objectStreamReset','100'),('spark.submit.pyFiles',''),('spark.shuffle.partitions','16'),('spark.executor.id','driver'),('spark.submit.deployMode','client'),('spark.app.id','local-1602571079244')]
我正在控制台中使用spark-submit
执行任务,因此应该使用给定的配置新建每个SparkSession。
我的代码在末尾包含一个groupBy
,一个内部join
和一个write.csv
。 csv输出是这里的主要问题。
当我在编写csv之前执行coalesce(1)
时,需要3分钟才能将200条数据收集到其中,输出csv的大小为338KB。在“阶段概述”中,我可以看到它同时执行200个任务时只能并行运行2个任务。没有它,它将仅写入200个单独的csv文件,每个文件2KB,大约需要3分钟。
我的输入数据是两个csv文件,大小分别为3.8MB和826KB。
在启用和未启用自适应优化的情况下,我都尝试过此操作,但感觉我的设置还是被忽略了。
我知道this related question,但这是三年半以前的V1.6版。
我也做了实验,首先创建一个SparkContext,设置并获取一个conf,停止SparkContext并将conf用于我的SparkSession,但这都没有帮助。
所以我的简单问题是:为什么我的spark.shuffle.partitions
设置被忽略,我该如何解决?
解决方法
我现在确实有点愚蠢。
我需要设置spark.sql.shuffle.partitions
而不是spark.shuffle.partitions
。
我期望Spark在获取不存在的设置时抛出错误,当这种情况没有发生时,我认为还可以。