问题描述
我需要编写合并的 DELTA 表并将其同步到 Azure 数据仓库。我们正在尝试读取 Delta 表,但 Spark 流不允许写入流到 Synapse 表。
然后我尝试读取 DBFS 文件夹中的镶木地板文件中的 DELTA 表,我们能够批量读取并向 Synapse DWH 表插入/写入数据,但我们无法保持 DELTA 表与 Synapse 表同步。
我们如何才能只在 Synapse 表中拥有最新的快照?我们没有使用 Synapse Analytics 工作区,而是尝试使用数据块中的 Spark 流作业来实现这一点。
任何指示都会有所帮助。
解决方法
您可以尝试通过在写入流的 forEachBatch 中附加新数据数据帧来保持数据同步,此方法允许以任意方式写入数据,如有必要,您可以使用 jdbc 连接到数据仓库:
df = spark.readStream\
.format('delta')\
.load(input_path)
df_write = df.writeStream \
.format("delta") \
.foreachBatch(batch_process) \
.option("checkpointLocation",delat_chk_path) \
.start(sink_path)\
您的批处理功能类似于:
def batch_process(df,batchId):
df = df.transformAsNeeded()
df.write.jdbc(jdbc_url,table=schema_name + "." + table_name,mode="append",properties=connection_properties)
当然,如果增量插入适合您的问题,您也可以尝试保留一个“最终”表或时态视图,其中填充了您想要的结果数据的快照,当在数据块中生成时,截断数据仓库中的目标表并用这个视图覆盖它(当然使用 jdbc),这可能很慢,如果我没记错的话,可能有一个突触连接器。您可以尝试类似的方法,将文件以 CSV、parquet 或 delta 格式直接写入您的存储帐户,并在突触中引用它与数据集。您还可以进行批量更新,也许使用数据工厂中的集成管道,您可以根据需要使用数据块或笔记本执行。