Oracle MERGE 重写为 PySpark如果为空 - 更新,否则 - 插入

问题描述

这些是我的桌子:
destination

enter image description here


new_data

enter image description here

在 Oracle sql 中,我可以这样做:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
  WHEN MATCHED THEN
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
  WHEN NOT MATCHED THEN
    INSERT (c1,c2,d1)
    VALUES (n.c1,n.c2,n.d1);

然后destination表变成这样:

enter image description here

如果 c1c2 存在于 destination 中且 d1 为空,则 d1 被更新。
如果 c1c2 不存在,则插入行。

有没有办法在 PySpark 中做同样的事情?

这会生成数据帧:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getorCreate()

dCols = ['c1','c2','d1']
dData = [('a','b',5),('c','d',None)]
destination = spark.createDataFrame(dData,dCols)

nData = [('a',1),6),('e','f',7),('g','h',None)]
new_data = spark.createDataFrame(nData,dCols)

在 PySpark 中,几乎包含 sql 中的所有内容。但我没有找到 MERGE 的等效项。

解决方法

您可以使用 coalesce

进行左连接并合并列
import pyspark.sql.functions as F

result = new_data.alias('t1').join(
    destination.alias('t2'),['c1','c2'],'full'
).select('c1','c2',F.coalesce('t2.d1','t1.d1').alias('d1'))

result.show()
+---+---+----+
| c1| c2|  d1|
+---+---+----+
|  e|  f|   7|
|  g|  h|null|
|  c|  d|   6|
|  a|  b|   5|
+---+---+----+
,

在 SQL 中,MERGE 可以替换为 left join union right join full outer join:

merged = destination.alias("dest").join(new_data.alias("src"),["c1","c2"],"full") \
    .selectExpr("c1","c2","coalesce(dest.d1,src.d1) as d1")

merged.show()

#+---+---+----+
#| c1| c2|  d1|
#+---+---+----+
#|  e|  f|   7|
#|  g|  h|null|
#|  c|  d|   6|
#|  a|  b|   5|
#+---+---+----+

但是,每次执行此合并时,您都需要将所有数据重写到目标中,因为 Spark 不支持更新并且可能导致性能不佳。所以如果你真的需要这样做,我建议你看看 Delta Lake ,它带来了 ACID 事务触发,并支持 MERGE 语法。