Spark 检查点导致连接问题

问题描述

我有一段代码,主要执行以下操作:

df=spark.read.parquet("very large dataset")
df.select(columns)
df.filter("some rows I dont want")

df2=df.groupBy('keys').agg("max of a column")
df=df.drop("columns that will be got from df2")
df=df.join(df2,on=["key cols"],"left")

spark.sparkContext.setCheckpointDir("checkpoint/path")
df3=df.checkpoint()

df4=df3.filter("condition 1").groupBy('key').agg("perform aggregations")
df5=df3.filter("condition 2").select(certain columns).alias(rename them)

df6=df4.join(df5,how="outer") #perform full outer join to get all columns and rows

此时,我收到以下错误

解决的 UL#28099 属性缺失 TOOL_ID#27908,LL#27913,LW#27915,UL#27236,UW#27914,SEQ_ID#27907,RESULT#27911,TIME_STAMP#27909,DATE#27910 在操作员 !Project [SEQ_ID#27907,TOOL_ID#27908,日期#27910,结果#27911,UL#28099,cast(CASE WHEN isnull(LL#27913) THEN -Infinity ELSE LL#27913 END as double) AS LL#27246,LW#27915]。具有相同名称属性出现在操作中: UL。请检查是否使用了正确的属性。;;\n加入 全外\n

但是,当我删除 checkpoint 并像正常缓存的数据框一样运行它时,它工作正常。 如果我的数据集很小,这没问题,但我需要检查点,因为与可用的 EMR 资源相比,我有一个非常大的数据集。

有人遇到过类似的问题吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)