Delta Table to Spark Streaming to Synapse Table in azure databricks

问题描述

我需要编写合并的 DELTA 表并将其同步到 Azure 数据仓库。我们正在尝试读取 Delta 表,但 Spark 流不允许写入流到 Synapse 表。

然后我尝试读取 DBFS 文件夹中的镶木地板文件中的 DELTA 表,我们能够批量读取并向 Synapse DWH 表插入/写入数据,但我们无法保持 DELTA 表与 Synapse 表同步。

我们如何才能只在 Synapse 表中拥有最新的快照?我们没有使用 Synapse Analytics 工作区,而是尝试使用数据块中的 Spark 流作业来实现这一点。

任何指示都会有所帮助。

解决方法

您可以尝试通过在写入流的 forEachBatch 中附加新数据数据帧来保持数据同步,此方法允许以任意方式写入数据,如有必要,您可以使用 jdbc 连接到数据仓库:

df = spark.readStream\
          .format('delta')\
          .load(input_path)

df_write = df.writeStream \
            .format("delta") \
            .foreachBatch(batch_process) \
            .option("checkpointLocation",delat_chk_path) \
            .start(sink_path)\

您的批处理功能类似于:

def batch_process(df,batchId):
  
    df = df.transformAsNeeded()
    df.write.jdbc(jdbc_url,table=schema_name + "." + table_name,mode="append",properties=connection_properties)

当然,如果增量插入适合您的问题,您也可以尝试保留一个“最终”表或时态视图,其中填充了您想要的结果数据的快照,当在数据块中生成时,截断数据仓库中的目标表并用这个视图覆盖它(当然使用 jdbc),这可能很慢,如果我没记错的话,可能有一个突触连接器。您可以尝试类似的方法,将文件以 CSV、parquet 或 delta 格式直接写入您的存储帐户,并在突触中引用它与数据集。您还可以进行批量更新,也许使用数据工厂中的集成管道,您可以根据需要使用数据块或笔记本执行。