pysprak - 微批处理流增量表作为对另一个增量表执行合并的源 - foreachbatch 没有被调用

问题描述

我创建了一个增量表,现在我正在尝试使用 foreachBatch() 将数据合并到该表。我已关注此example。我在谷歌云中的 dataproc 图像 1.5x 中运行此代码

Spark 2.4.7 版 Delta 版本 0.6.0

我的代码如下:

from delta.tables import *

spark = SparkSession.builder \
    .appName("streaming_merge") \
    .master("local[*]") \
    .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getorCreate()
  
# Function to upsert `microBatchOutputDF` into Delta table using MERGE
def mergetoDelta(microBatchOutputDF,batchId): 
  (deltaTable.alias("accnt").merge(                                            
       microBatchOutputDF.alias("updates"),\
       "accnt.acct_nbr = updates.acct_nbr") \
     .whenMatchedDelete(condition = "updates.cdc_ind='D'") \
     .whenMatchedUpdateall(condition = "updates.cdc_ind='U'") \
     .whenNotMatchedInsertAll(condition = "updates.cdc_ind!='D'") \
     .execute()
  )

deltaTable = DeltaTable.forPath(spark,"gs:<<path_for_the_target_delta_table>>")

# Define the source extract
SourceDF = (
  spark.readStream \
    .format("delta") \
    .load("gs://<<path_for_the_source_delta_location>>")

# Start the query to continuously upsert into target tables in update mode
SourceDF.writeStream \
  .format("delta") \
  .outputMode("update") \
  .foreachBatch(mergetoDelta) \
  .option("checkpointLocation","gs:<<path_for_the_checkpint_location>>") \
  .trigger(once=True) \
  .start() \

这段代码运行没有任何问题,但没有数据写入增量表,我怀疑 foreachBatch 没有被调用。有人知道我做错了什么吗?

解决方法

添加 awaitTermination 后,流开始工作并从源获取最新数据并在增量目标表上执行合并。