如何在Spark结构化流媒体中的每个Spark任务中获取写入Kafka的记录数?

问题描述

我通过扩展CustomListener创建了SparkListener。当我将数据写入文件时,它确实会打印在spark任务中写入的记录数,但是如果我写入Kafka,则recordsWrittenCount始终为零。

如何获取每个Spark任务中写入kafka的记录数?

class CustomListener extends SparkListener {

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      var recordsWrittenCount = 0L
      var recordsReadCount = 0L
      recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
      recordsReadCount += taskEnd.taskMetrics.inputMetrics.recordsRead
      println("TaskId: " + taskEnd.taskInfo.taskId + " recordsReadCount: " + recordsReadCount + " recordsWrittenCount: " + recordsWrittenCount)
    }
  }
}

object TestReader extends App {
  val spark = SparkSession.builder().master("local[1]").getorCreate()
  spark.sparkContext.addSparkListener(new CustomListener)

  val rates = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("subscribe","inTopic")
    .load()

  rates
    .writeStream
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .foreachBatch {
      (batchDF: DataFrame,batchId: Long) => {

        // Below prints correct value for recordsWrittenCount
        batchDF
          .write
          .format("csv")
          .option("delimiter","|")
          .mode(SaveMode.Overwrite)
          .save("/tmp/KafkaDir")

        //This always returns zero value for recordsWrittenCount
//        batchDF
//          .write
//          .format("kafka")
//          .option("kafka.bootstrap.servers","localhost:9092")
//          .option("checkpointLocation","/tmp/test2")
//          .option("topic","outTopic")
//          .save()


      }
    }
    .start()

  spark.streams.awaitAnyTermination()

}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)