结构化流输出 - 在不破坏传出读取流顺序保证的情况下使用 OPTIMIZE 进行压缩 注意

问题描述

我有一个来自 kafka 的使用结构化流的传入“仅附加”更新流。使用 foreachBatch 并在其中写入:

parsedDf \
    .select("parsedId","ingestionDate","parsedValue.after","parsedValue.patch","parsedValue.op","parsedvalue.ts_ms",'partition','offset') \
    .write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema","true") \
    .save(f"/mnt/defaultDatalake/{append_table_name}")

后来,在下游作业中,我有一个 readStream 读取在那里创建的文件

问题 - 这些作业创建了大量文件,因为主题不是很完整。虽然下游作业对此很好(从流的末尾读取),但我还需要直接查询这些数据(附加表),但由于涉及的文件数量很多,因此查询时间很长。

当然,我尝试在此存储上使用 OPTIMIZE,但随后它似乎破坏了使用这些文件的 readStream 的顺序保证。

所以我需要 - 一种将小文件滚动到更大文件方法(假设 - 超过一周)而不破坏下游消费者的严格顺序保证(即使它需要重新读取早期数据)

Spark 3. 在数据块 7.5 中运行

解决方法

Databrick 的增量保证

OPTIMIZE guarantees

Performing OPTIMIZE on a table that is a streaming source does not affect 
any current or future streams that treat this table as a source

所以这要么是 Delta 的 OPTIMIZE 中的错误,要么是您的代码中的错误。 我不能说 OPTIMIZE - 它不是开源的。

建议

compaction 的手动方式: (我正在我的项目中运行以下更复杂的版本)尝试跟随。请注意,dataChange 选项对于将增量接收器用作流媒体源非常重要。

spark.read \
        .format("delta") \
        .load(root_path) \
        .write \
        .format("delta") \
        .option("dataChange","false") \
        .mode("overwrite") \
        .save(root_path)

分区压缩,根据 https://mungingdata.com/delta-lake/compact-small-files/,加上我添加的 dataChange

spark.read\
  .format("delta")\
  .load(table)\
  .where(partition)\
  .repartition(numFiles)\
  .write\
  .format("delta")\
  .option("dataChange","false") \
  .mode("overwrite")\
  .option("replaceWhere",partition)\
  .save(table) 

注意

请注意,通过 S3 处理多个并发写入作业是 not supported。这可能是问题的来源之一。