pyspark 中的 For-Loops 导致数据帧大小增加和作业失败

问题描述

我的 pyspark 代码中有一个 for 循环。当我在大约 5 个循环上测试代码时,它工作正常。但是当我在我的核心数据集上运行它导致 160 个循环时,我的 pyspark 作业(在 EMR 集群上提交)失败。它在失败之前先尝试第二次。

以下是作业在 Spark History Server 中运行的屏幕截图:

enter image description here

初始作业 Attempt ID 1 在下午 4:13 运行,4 小时后进行了第二次尝试 Attempt ID 2,但失败了。当我打开作业时,我没有看到任何失败的任务或阶段。 我猜这是因为 for 循环的大小不断增加

这是输出的标准错误日志:它失败,状态为 1

enter image description here

这是我的伪代码

#Load Dataframe
df=spark.read.parquet("s3://path")
df=df.persist(StorageLevel.MEMORY_AND_disK) # I will be using this df in the for loop
flist=list(df.select('key').distinct().toPandas()['key'])
output=[]

for i in flist:
    df2=df.filter(col('key)==i))
    Perform operations on df2 by each key that result in a dataframe df3
    output.append(df3)

final_output = reduce(DataFrame.unionByName,output)

我认为 output 数据帧的大小会增加,作业最终会失败。 我正在运行 9 个工作节点和 8 个 vCore,每个节点有 50GB 内存。

有没有办法在一定次数的循环后将 output 数据帧写入检查点,清除内存,然后从它在 Spark 中停止的地方继续循环?

编辑: 我的预期输出是这样的:

key        mean   prediction
3172742   0.0448    1
3172742   0.0419    1
3172742   0.0482    1
3172742   0.0471    1
3672767   0.0622    2
3672767   0.0551    2
3672767   0.0406    1

我可以使用 groupBy 函数,因为我正在执行 kmeans 聚类并且它不允许 groupBy。所以我必须遍历每个键来执行 kmeans 聚类。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)