问题描述
df=spark.read.parquet("very large dataset")
df.select(columns)
df.filter("some rows I dont want")
df2=df.groupBy('keys').agg("max of a column")
df=df.drop("columns that will be got from df2")
df=df.join(df2,on=["key cols"],"left")
spark.sparkContext.setCheckpointDir("checkpoint/path")
df3=df.checkpoint()
df4=df3.filter("condition 1").groupBy('key').agg("perform aggregations")
df5=df3.filter("condition 2").select(certain columns).alias(rename them)
df6=df4.join(df5,how="outer") #perform full outer join to get all columns and rows
此时,我收到以下错误:
已解决的 UL#28099 属性缺失 TOOL_ID#27908,LL#27913,LW#27915,UL#27236,UW#27914,SEQ_ID#27907,RESULT#27911,TIME_STAMP#27909,DATE#27910 在操作员 !Project [SEQ_ID#27907,TOOL_ID#27908,日期#27910,结果#27911,UL#28099,cast(CASE WHEN isnull(LL#27913) THEN -Infinity ELSE LL#27913 END as double) AS LL#27246,LW#27915]。具有相同名称的属性出现在操作中: UL。请检查是否使用了正确的属性。;;\n加入 全外\n
但是,当我删除 checkpoint
并像正常缓存的数据框一样运行它时,它工作正常。
如果我的数据集很小,这没问题,但我需要检查点,因为与可用的 EMR 资源相比,我有一个非常大的数据集。
有人遇到过类似的问题吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)