为什么 spark.ml CrossValidator 给出了“Broadcasting large task binary with size X”和大数据集?

问题描述

问题:

我一直致力于使用 Pyspark 和 Spark ml 库分发 CrossValidation 过程,因此与常规顺序计算(即 scikit)相比,它需要的时间更少。但是,我在做这件事时遇到了一些问题。具体来说,当我开始工作时,我不断收到消息“broadcasting large task binary with size X”(X 是从 1700 KiB 到 6 MiB 的数字)。在我离开工作一段时间后,它最终以消息“Job X 取消,因为 SparkContext 被关闭”(对于很多 Xs = 作业)和“ERROR TransportRequestHandler: Error为单向消息调用 RpcHandler#receive() 时。 org.apache.spark.SparkException:找不到 CoarseGrainedScheduler”。

推理:

由于我不得不修改pyspark.ml.tuning#CrossValidator”中的 CrossValidator _fit 方法的源代码,因此我对它的运作方式非常熟悉,知道它分配任务的方式是通过对每个拆分数据集 使用不同参数设置训练模型。也就是说,CrossValidator _fit 是将整个数据集发送给执行器,每次在每个执行器中单独训练一个具有特定参数组合的模型,似乎 Spark 不太喜欢广播数据集。这是pyspark.ml.tunning _fit 方法的相关部分:

        for i in range(nFolds):
        validation = datasets[i][1].cache()
        train = datasets[i][0].cache()

        tasks = _parallelFitTasks(est,train,eva,validation,epm,collectSubModelsParam)
        for j,metric,subModel in pool.imap_unordered(lambda f: f(),tasks):
            metrics[j] += (metric / nFolds)
            if collectSubModelsParam:
                subModels[i][j] = subModel

        validation.unpersist()
        train.unpersist()

我尝试过的:

对于收到的广播警告,我已经尝试了最常见的解决方案,尽管我已经认为它们在我的情况下不起作用。具体来说,我修改了数据的分区和并行化参数,以及执行器和驱动程序的内存大小。

我很确定是否在ml库中存在CrossValidator的分布式实现是因为它实际上很有用。但是,我一定遗漏了一些东西,因为如果我的数据集很大并且需要多次广播(因为实现),我无法考虑如何使其工作。也许我错过了什么?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)