问题描述
我想阅读一个Kafka主题并写入一个Parquet或增量文件,并能够在阅读Kafka主题中的所有消息之前从该Parquet文件中读取。我已经进行了这项工作,但后来进行了更改,现在必须等到所有消息都用完后,实木复合地板文件中才包含任何内容。我的代码在下面。
import org.apache.spark.sql.SparkSession
object MinimalTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("MinimalTest")
.getorCreate()
val kafkabrokers = "localhost:9092"
val topic = "Faketopic"
val startingOffsets = "earliest"
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkabrokers)
.option("startingOffsets",startingOffsets)
.option("subscribe",topic)
.load()
val path = "<dir>/MinimalTest"
val checkpointLocation = "<dir>/CheckpointMinimalTest"
df.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation",checkpointLocation)
.option("path",path)
.start()
spark.streams.awaitAnyTermination()
}
}
我没有找到任何遇到相同问题的人,也没有通过阅读相关文档找到解决方案。我想有人告诉我要承诺。我尝试将“ enable.auto.commit”设置为true,但随后收到一条错误消息,提示不支持“ enable.auto.commit”。
我正在使用Spark.2.4.4
解决方法
您可以通过在Kafka源选项(Structured Streaming + Kafka Integration Guide)中设置maxOffsetsPerTrigger
来限制每个触发器处理的偏移量:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaBrokers)
.option("startingOffsets",startingOffsets)
.option("maxOffsetsPerTrigger",10)
.option("subscribe",topic)
.load()
如果未定义maxOffsetsPerTrigger
,将使用最新的偏移量,如您在Spark 2.4.4 code中所见。