使用 Spark/Kafka 流处理应用程序记录

问题描述

我刚开始在 Scala 中使用 Spark 和 Kafka 集成。但是,我遇到了记录问题。我尝试了许多不同的日志库,但它们都从 Spark 返回相同的错误

错误如下:Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

我的代码如下:

val df = spark.read.format("kafka")
  .option("kafka.bootstrap.servers","host1:9092,host2:9092")
  .option("subscribe","test")
  .load()

// Dataframe of the 'value' column in the original dataframe from above
val msg = df.select("value").as[String]

// modify_msg is a string produced by Extract_info
val modify_msg = Extract_Info(msg.first.getString(0)).toString()

//Error occurs here. I also tried different logger libraries like SLF4J
println(modify_msg)


val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

我想知道是否有办法打印或记录结果。问题是 writeStream.start() 函数仅适用于数据帧,我无法让它打印字符串。任何帮助将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)