问题描述
我有一个来自 kafka 的使用结构化流的传入“仅附加”更新流。使用 foreachBatch
并在其中写入:
parsedDf \
.select("parsedId","ingestionDate","parsedValue.after","parsedValue.patch","parsedValue.op","parsedvalue.ts_ms",'partition','offset') \
.write \
.format("delta") \
.mode("append") \
.option("mergeSchema","true") \
.save(f"/mnt/defaultDatalake/{append_table_name}")
后来,在下游作业中,我有一个 readStream
读取在那里创建的文件。
问题 - 这些作业创建了大量文件,因为主题不是很完整。虽然下游作业对此很好(从流的末尾读取),但我还需要直接查询这些数据(附加表),但由于涉及的文件数量很多,因此查询时间很长。
当然,我尝试在此存储上使用 OPTIMIZE
,但随后它似乎破坏了使用这些文件的 readStream 的顺序保证。
所以我需要 - 一种将小文件滚动到更大文件的方法(假设 - 超过一周)而不破坏下游消费者的严格顺序保证(即使它需要重新读取早期数据)
Spark 3. 在数据块 7.5 中运行
解决方法
Databrick 的增量保证
Performing OPTIMIZE on a table that is a streaming source does not affect any current or future streams that treat this table as a source
所以这要么是 Delta 的 OPTIMIZE 中的错误,要么是您的代码中的错误。 我不能说 OPTIMIZE - 它不是开源的。
建议
compaction 的手动方式:
(我正在我的项目中运行以下更复杂的版本)尝试跟随。请注意,dataChange
选项对于将增量接收器用作流媒体源非常重要。
spark.read \
.format("delta") \
.load(root_path) \
.write \
.format("delta") \
.option("dataChange","false") \
.mode("overwrite") \
.save(root_path)
分区压缩,根据 https://mungingdata.com/delta-lake/compact-small-files/,加上我添加的 dataChange
:
spark.read\
.format("delta")\
.load(table)\
.where(partition)\
.repartition(numFiles)\
.write\
.format("delta")\
.option("dataChange","false") \
.mode("overwrite")\
.option("replaceWhere",partition)\
.save(table)
注意
请注意,通过 S3 处理多个并发写入作业是 not supported。这可能是问题的来源之一。