如何在Spark 3.0结构化流中使用kafka.group.id和检查点以继续从重新启动后停止的Kafka读取?

问题描述

基于Spark 3.0中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html。应该可以设置“ kafka.group.id”来跟踪偏移量。对于我们的用例,如果流火花作业失败并重新启动,我想避免潜在的数据丢失。根据之前的问题,我觉得Spark 3.0中的kafka.group.id会有所帮助。

How to specify the group id of kafka consumer for spark structured streaming?

How to ensure no data loss for kafka data ingestion through Spark Structured Streaming?

但是,我尝试了以下spark 3.0中的设置。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,BooleanType,LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem,Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.sqlException
import java.sql.Statement


//import org.apache.spark.sql.hive.HiveContext

import scala.io.source

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kms.{AWSKMS,AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding


object App {
    
    def main(args: Array[String]): Unit = {
      
      val spark: SparkSession = SparkSession.builder()
        .appName("MY-APP")
        .getorCreate()

      import spark.sqlContext.implicits._

      spark.catalog.clearCache()
      spark.conf.set("spark.sql.autobroadcastJoinThreshold",-1)
      spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

      spark.sparkContext.setLogLevel("ERROR")
      spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
      
      System.gc()
      
      val df = spark.readStream
        .format("kafka")
          .option("kafka.bootstrap.servers","mybroker.io:6667")
          .option("subscribe","mytopic")
          .option("kafka.security.protocol","SASL_SSL")
          .option("kafka.ssl.truststore.location","/home/ec2-user/environment/spark/spark-local/creds/cacerts")
          .option("kafka.ssl.truststore.password","changeit")
          .option("kafka.ssl.truststore.type","JKS")
          .option("kafka.sasl.kerberos.service.name","kafka")
          .option("kafka.sasl.mechanism","GSSAPI")
          .option("kafka.group.id","MYID")
          .load()

      df.printSchema()

      
      val schema = new StructType()
        .add("id",StringType)
        .add("x",StringType)
        .add("eventtime",StringType)

      val idservice = df.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"),schema).as("data"))
        .select("data.*")

       
      val monitoring_df = idservice
                .selectExpr("cast(id as string) id","cast(x as string) x","cast(eventtime as string) eventtime")              

      val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .foreachBatch { (batchDF: DataFrame,batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                    batchDF.persist()
                                    printf("At %d,the %dth microbatch has %d records and %d partitions \n",Instant.Now.getEpochSecond,batchId,batchDF.count(),batchDF.rdd.partitions.size)                                    
                                    batchDF.show()

                                    batchDF.write.mode(SaveMode.Overwrite).option("path","/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
                                    spark.catalog.refreshTable("mytable")
                                    
                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
                                }
                            }
                            .start()
                            .awaitTermination()
    }
   
}

通过使用以下spark-submit命令在独立模式下测试spark作业,但是当我在AWS EMR中以集群模式部署时,存在相同的问题。

spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extrajavaoptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extrajavaoptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar

然后,我开始了流作业,以从Kafka主题读取流数据。一段时间后,我杀死了火花工作。然后,我等待1个小时以重新开始工作。如果我正确理解,则在终止Spark作业时,新的流数据应从偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致数据丢失。

是否需要配置更多选项以避免数据丢失?还是我对Spark 3.0有一些误解?谢谢!

问题已解决

此处的关键问题是必须将检查点专门添加查询中。仅为SparkContext添加检查点是不够的。添加检查点后,它可以正常工作。在checkpoint文件夹中,将创建一个偏移子文件夹,其中包含偏移文件0、1、2、3...。对于每个文件,它将显示不同分区的偏移信息。

{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}

一个建议是将检查点放到某些外部存储中,例如s3。即使您需要重建EMR群集本身,它也可以帮助恢复偏移量。

解决方法

根据Spark Structured Integration Guide,Spark本身一直在跟踪偏移量,没有偏移量被提交回Kafka。这意味着,如果您的Spark Streaming作业失败并重新启动,则有关偏移量的所有必要信息都存储在Spark的检查点文件中。

即使您使用kafka.group.id设置ConsumerGroup名称,您的应用程序仍不会将消息提交回Kafka。关于下一个要读取的偏移量的信息仅在您的Spark应用程序的检查点文件中可用。

如果在不重新部署的情况下停止并重新启动应用程序,并确保您不删除旧的检查点文件,则您的应用程序将继续从上次停止的地方读取。

Recovering from Failures with Checkpointing上的Spark结构化流文档中写道:

“如果发生故障或有意关闭,则可以恢复先前查询的先前进度和状态,并从中断的地方继续进行。这可以使用检查点和预写日志来完成。您可以使用以下命令配置查询检查点位置,查询将保存所有进度信息(即,每个触发器中处理的偏移量范围)[...]“

这可以通过在writeStream查询中设置以下选项来实现(仅在SparkContext配置中设置检查点目录是

.option("checkpointLocation","path/to/HDFS/dir")

在文档中还应注意:“此检查点位置必须是与HDFS兼容的文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项。”

此外,如Output Sinks部分所述,Spark结构化流的容错能力还取决于您的输出接收器。

由于您当前正在使用ForeachBatch接收器,因此您的应用程序中可能没有重启功能。

enter image description here