使用Scala中的Spark Structured Streaming从kafka流式传输到kafka

问题描述

我正在尝试运行来自官方 spark 教程和一本书“spark Streaming in action”中示例的简单变体。
异常的内容很奇怪。我的代码有什么问题?

首先我启动kafka zookeeper,服务器,生产者和2个消费者。然后我运行以下代码

// read from kafka
val df = sparkService.sparkSession
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("subscribe",topic1)
    .load()

// write to kafka
import sparkService.sparkSession.implicits._

val query = df.selectExpr("CAST(key as STRING)","CAST(value as STRING)")
    .writeStream
    .outputMode(OutputMode.Append())
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("topic",topic2)
    .option("checkpointLocation","/home/pt/Dokumenty/tmp/")
    .option("failOnDataLoss","false") // only when testing
    .start()

query.awaitTermination(30000)

写入 kafka 时发生错误

线程“main” org.apache.spark.sql.streaming.StreamingQueryException 中的异常:预期,例如{"topicA":{"0":23,"1":-1},"topicB":{"0":-2}},得到 1 1609627750463

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)