问题描述
问题:
我一直致力于使用 Pyspark 和 Spark ml 库分发 CrossValidation 过程,因此与常规顺序计算(即 scikit)相比,它需要的时间更少。但是,我在做这件事时遇到了一些问题。具体来说,当我开始工作时,我不断收到消息“broadcasting large task binary with size X”(X 是从 1700 KiB 到 6 MiB 的数字)。在我离开工作一段时间后,它最终以消息“Job X 取消,因为 SparkContext 被关闭”(对于很多 Xs = 作业)和“ERROR TransportRequestHandler: Error为单向消息调用 RpcHandler#receive() 时。 org.apache.spark.SparkException:找不到 CoarseGrainedScheduler”。
推理:
由于我不得不修改“pyspark.ml.tuning#CrossValidator”中的 CrossValidator _fit 方法的源代码,因此我对它的运作方式非常熟悉,知道它分配任务的方式是通过对每个拆分数据集 使用不同参数设置训练模型。也就是说,CrossValidator _fit 是将整个数据集发送给执行器,每次在每个执行器中单独训练一个具有特定参数组合的模型,似乎 Spark 不太喜欢广播数据集。这是pyspark.ml.tunning _fit 方法的相关部分:
for i in range(nFolds):
validation = datasets[i][1].cache()
train = datasets[i][0].cache()
tasks = _parallelFitTasks(est,train,eva,validation,epm,collectSubModelsParam)
for j,metric,subModel in pool.imap_unordered(lambda f: f(),tasks):
metrics[j] += (metric / nFolds)
if collectSubModelsParam:
subModels[i][j] = subModel
validation.unpersist()
train.unpersist()
我尝试过的:
对于收到的广播警告,我已经尝试了最常见的解决方案,尽管我已经认为它们在我的情况下不起作用。具体来说,我修改了数据的分区和并行化参数,以及执行器和驱动程序的内存大小。
我很确定是否在ml库中存在CrossValidator的分布式实现是因为它实际上很有用。但是,我一定遗漏了一些东西,因为如果我的数据集很大并且需要多次广播(因为实现),我无法考虑如何使其工作。也许我错过了什么?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)