问题描述
我正在开发一个实时流应用程序,该应用程序轮询来自Kafka代理的数据,并且我正在默认情况下使用微结构化(Spark Structured Streaming)对以前使用Spark结构化流的代码进行改编。但是,我不知道如何使用连续流而不是微批处理流来获得类似的行为。这是一段有效的代码:
query = df.writeStream \
.foreachBatch(foreach_batch_func) \
.start()
这是我到目前为止对连续流进行的尝试:
query = df \
.writeStream \
.foreach(example_func) \
.trigger(continuous = '1 second') \
.start()
该应用会弹出以下错误:
连续执行不支持在
org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)
重试任务
我正在使用Spark(pyspark)3.0.1 w / Scala 2.12,Kafka 2.6.0
提交应用程序时,我要添加jar org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)