无法使用Spark连续流处理数据

问题描述

我正在开发一个实时流应用程序,该应用程序轮询来自Kafka代理的数据,并且我正在默认情况下使用微结构化(Spark Structured Streaming)对以前使用Spark结构化流的代码进行改编。但是,我不知道如何使用连续流而不是微批处理流来获得类似的行为。这是一段有效的代码:

query = df.writeStream \
        .foreachBatch(foreach_batch_func) \
        .start()

这是我到目前为止对连续流进行的尝试:

query = df \
        .writeStream \
        .foreach(example_func) \
        .trigger(continuous = '1 second') \
        .start()

该应用会弹出以下错误:

连续执行不支持在org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)重试任务

我正在使用Spark(pyspark)3.0.1 w / Scala 2.12,Kafka 2.6.0

提交应用程序时,我要添加jar org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)