问题描述
我创建了一个增量表,现在我正在尝试使用 foreachBatch() 将数据合并到该表。我已关注此example。我在谷歌云中的 dataproc 图像 1.5x 中运行此代码。
Spark 2.4.7 版 Delta 版本 0.6.0
我的代码如下:
from delta.tables import *
spark = SparkSession.builder \
.appName("streaming_merge") \
.master("local[*]") \
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getorCreate()
# Function to upsert `microBatchOutputDF` into Delta table using MERGE
def mergetoDelta(microBatchOutputDF,batchId):
(deltaTable.alias("accnt").merge(
microBatchOutputDF.alias("updates"),\
"accnt.acct_nbr = updates.acct_nbr") \
.whenMatchedDelete(condition = "updates.cdc_ind='D'") \
.whenMatchedUpdateall(condition = "updates.cdc_ind='U'") \
.whenNotMatchedInsertAll(condition = "updates.cdc_ind!='D'") \
.execute()
)
deltaTable = DeltaTable.forPath(spark,"gs:<<path_for_the_target_delta_table>>")
# Define the source extract
SourceDF = (
spark.readStream \
.format("delta") \
.load("gs://<<path_for_the_source_delta_location>>")
# Start the query to continuously upsert into target tables in update mode
SourceDF.writeStream \
.format("delta") \
.outputMode("update") \
.foreachBatch(mergetoDelta) \
.option("checkpointLocation","gs:<<path_for_the_checkpint_location>>") \
.trigger(once=True) \
.start() \
这段代码运行没有任何问题,但没有数据写入增量表,我怀疑 foreachBatch 没有被调用。有人知道我做错了什么吗?
解决方法
添加 awaitTermination 后,流开始工作并从源获取最新数据并在增量目标表上执行合并。