问题描述
我是新来的火花。我正在使用以下配置集在Spark独立版本(v3.0.0)中编写机器学习算法:
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.set("spark.driver.memory","8g");
conf.set("spark.driver.maxResultSize","8g");
conf.set("spark.memory.fraction","0.6");
conf.set("spark.memory.storageFraction","0.5");
conf.set("spark.sql.shuffle.partitions","5");
conf.set("spark.memory.offheap.enabled","false");
conf.set("spark.reducer.maxSizeInFlight","96m");
conf.set("spark.shuffle.file.buffer","256k");
conf.set("spark.sql.debug.maxToStringFields","100");
这就是我创建CrossValidator的方式
ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxBins(),new int[]{50})
.addGrid(gbt.maxDepth(),new int[]{2,5,10})
.addGrid(gbt.maxIter(),new int[]{5,20,40})
.addGrid(gbt.minInfoGain(),new double[]{0.0d,.1d,.5d})
.build();
CrossValidator gbcv = new CrossValidator()
.setEstimator(gbt)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(gbevaluator)
.setNumFolds(5)
.setParallelism(8)
.setSeed(session.getArguments().getTrainingRandom());
问题是,当(在paramGrid中)maxDepth仅为{2,5}而maxIter {5,20}都可以正常工作时,但是就像上面的代码中那样,它会继续记录日志:
WARN DAGScheduler: broadcasting large task binary with size xx
,
xx从1000 KiB到2.9 MiB,通常会导致超时异常
我应该更改哪些火花参数以避免这种情况?
解决方法
对于超时问题,请考虑更改以下配置:
spark.sql.autoBroadcastJoinThreshold 为 -1。
这将取消广播大小的限制,即 10MB。
,对我有用的解决方案是:
减少任务大小 => 减少其处理的数据
首先,通过 df.rdd.getNumPartitions()
检查数据帧中的分区数
之后,增加分区:df.repartition(100)