合并到deltalake表中更新所有行

问题描述

我正在尝试使用spark数据帧更新deltalake表。我想做的是更新spark数据帧中与deltalake表中不同的所有行,并插入deltalake表中所有丢失的行。

我尝试如下操作:

import io.delta.tables._

val not_equal_string = df.schema.fieldNames.map(fn => 
    s"coalesce(not ((updates.${fn} = history.${fn}) or (isnull(history.${fn}) and isnull(updates.${fn})) ),false)"
    ).reduceLeft((x,y) => s"$x OR $y ")

val deltaTable = DeltaTable.forPath(spark,"s3a://sparkdata/delta-table")

deltaTable.as("history").merge(
    df.as("updates"),"updates.EquipmentKey = history.EquipmentKey"
).whenMatched(not_equal_string).updateall().whenNotMatched().insertAll().execute()

这有效,但是当我查看生成的增量表时,即使没有更新单个记录,它的大小实际上也翻了一番。会为每个旧分区生成一个带有 remove 的新json文件,并为所有新分区添加一个 add

当我仅使用 whenMatched 条件作为where条件运行sql联接时,我没有得到任何一行。

我希望在进行这样的合并操作后不会更改增量表。我错过了一些简单的东西吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)