问题描述
我刚开始在 Scala 中使用 Spark 和 Kafka 集成。但是,我遇到了记录问题。我尝试了许多不同的日志库,但它们都从 Spark 返回相同的错误。
错误如下:Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
我的代码如下:
val df = spark.read.format("kafka")
.option("kafka.bootstrap.servers","host1:9092,host2:9092")
.option("subscribe","test")
.load()
// Dataframe of the 'value' column in the original dataframe from above
val msg = df.select("value").as[String]
// modify_msg is a string produced by Extract_info
val modify_msg = Extract_Info(msg.first.getString(0)).toString()
//Error occurs here. I also tried different logger libraries like SLF4J
println(modify_msg)
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
我想知道是否有办法打印或记录结果。问题是 writeStream.start()
函数仅适用于数据帧,我无法让它打印字符串。任何帮助将不胜感激。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)