问题描述
我正在尝试使用spark数据帧更新deltalake表。我想做的是更新spark数据帧中与deltalake表中不同的所有行,并插入deltalake表中所有丢失的行。
我尝试如下操作:
import io.delta.tables._
val not_equal_string = df.schema.fieldNames.map(fn =>
s"coalesce(not ((updates.${fn} = history.${fn}) or (isnull(history.${fn}) and isnull(updates.${fn})) ),false)"
).reduceLeft((x,y) => s"$x OR $y ")
val deltaTable = DeltaTable.forPath(spark,"s3a://sparkdata/delta-table")
deltaTable.as("history").merge(
df.as("updates"),"updates.EquipmentKey = history.EquipmentKey"
).whenMatched(not_equal_string).updateall().whenNotMatched().insertAll().execute()
这有效,但是当我查看生成的增量表时,即使没有更新单个记录,它的大小实际上也翻了一番。会为每个旧分区生成一个带有 remove 的新json文件,并为所有新分区添加一个 add 。
当我仅使用 whenMatched 条件作为where条件运行sql联接时,我没有得到任何一行。
我希望在进行这样的合并操作后不会更改增量表。我错过了一些简单的东西吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)