问题描述
我正在尝试执行一个简单的 Spark 结构化流应用程序,它目前对从本地 Kafka 集群中提取并写入本地文件系统没有太大的期望。代码如下:
private static final String TARGET_PATH = "orchestration/target/myfolder/";
private static final String BOOTSTRAP_SERVER = "localhost:9092";
private static final String KAFKA_TOPIC = "twitter_aapl2";
public static void main(String[] args) throws TimeoutException,StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").appName("spark app").getorCreate();
Dataset<Row> df = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers",BOOTSTRAP_SERVER)
.option("subscribe",KAFKA_TOPIC)
.load();
StreamingQuery query = df.writeStream()
.outputMode("append")
.format("parquet")
.option("path",TARGET_PATH + "data/")
.option("checkpointLocation",TARGET_PATH + "checkpoints/")
.start();
query.awaitTermination();
但是在执行时,我得到以下输出并且我的数据并没有真正得到处理。
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1,groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1,groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1,groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
我该如何解决这个问题?
解决方法
事实证明,如果人们不是从头开始而是从最新偏移量读取主题,则这种查找和重置行为是非常理想的。然后,管道仅读取在运行时发送到 Kafka 主题的新数据,并且由于没有发送新数据,因此会出现寻找(新数据)和重置(到最新偏移量)的无限循环。
底线,只需从头读取或发送新数据即可解决问题。
,我不得不通过设置日志配置来避免这种情况:
log4j.logger.org.apache.kafka.clients.consumer.internals.SubscriptionState=WARN
尽管这是预期的行为,但似乎没有必要每隔几毫秒记录一次“寻求最新偏移量”消息。它隐藏了日志文件中的所有其他应用程序日志。当从一直不是很活跃的主题中消费时,这个问题变得更加令人担忧。如果只是在 DEBUG 级别而不是 INFO 级别会更好。