问题描述
我正在尝试使用以下代码将流式DataFrame合并到增量表中:
df.sparkSession.sql(s"""
MERGE INTO dTable d
USING updates u
ON u.load_date = MAX(d.load_date)OVER(PARTITION BY d.some_key)
WHEN NOT MATCHED THEN INSERT *
""")
这将导致以下错误:
所以现在我要加载增量表并将其与流表一起加入:
val dTable = spark.read.format("delta").load("/mnt/delta/events/d_table")
deltaLksJobDag.createOrReplaceTempView("d_table")
val dTableWithEndDate = spark.sql("""SELECT delta.some_key as some_key,MAX(delta.load_date)OVER(PARTITION BY delta.job_hkey) as load_end_date
FROM d_table delta""")
deltaLksJobDagWithEndDate.createOrReplaceTempView("d_table_end_date")
val result = spark.sql("""SELECT src.some_key as some_key,src.load_date as load_date
FROM source src
LEFT OUTER JOIN d_table_end_date delta ON src.load_date = delta.load_end_date""")
result
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","/mnt/delta/events/_checkpoints/d_table")
.start("/mnt/delta/events/d_table")
我认为从性能角度来看,这不如第一种选择好。因此,我想知道在将数据帧合并到增量表中时是否还有另一种使用聚合函数作为条件的方法?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)