如何使用结构化流从Spark发布到Kafka?

问题描述

我正在编写一个Spark应用程序,该应用程序从Kafka主题读取消息,在数据库中查找记录,构造新消息并将它们发布到另一个Kafka主题。这是我的代码的样子-

val inputMessagesDataSet: DataSet[InputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","server1")
  .option("subscribe","input-kafka-topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[InputMessage] = parseMessages(r)
  }

inputMessagesDataSet
  .writeStream
  .foreachBatch(processMessages _)
  .trigger(trigger)
  .start
  .awaitTermination

def processMessages(inputMessageDataSet: Dataset[InputMessage]) = {
   // fetch stuff from DB and build a DataSet[OutputMessage]
   val outputMessagesDataSet: DataSet[OutputMessage] = ...
   // Now queue to another kafka topic
  outputMessagesDataSet
      .writeStream
      .trigger(trigger)
      .format("kafka")
      .option("kafka.bootstrap.servers","server1")
      .option("topic","output-kafka-topic")
      .option("checkpointLocation",loc)
      .start
      .awaitTermination
}

但是我说错了

org.apache.spark.sql.AnalysisException: 'writeStream' can be called only on streaming Dataset/DataFrame; 在线 outputMessagesDataSet.writeStream

这似乎是因为outputMessagesDataSet不是使用readStream创建的。 我之所以没有在原始DataSet[OutputMessage]中构造mapPartitions()的原因是因为获取数据库记录等所需的类不可序列化,因此它抛出了NotSerializableException。>

如何构造新的数据集并排队到Kafka?

解决方法

foreachBatch接受静态数据集,因此您需要使用write而不是writeStream

或者,您可以writeStream.format("kafka"),而无需使用forEachBatch