join stream-stream-error:pyspark.sql.utils.AnalysisException:'流数据帧/数据集不支持多个流聚合

问题描述

我正在将窗口函数groupby一起使用,并聚合流数据帧的一列。这给出了窗口大小10的基于移动窗口的平均值。 另外,对窗口大小12进行同样的操作。

因此,我最终得到两个移动平均线。我想根据时间戳记的结束时间加入这两个移动平均流。

我反复收到错误消息,

pyspark.sql.utils.AnalysisException: 'Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;\nJoin Inner,

我想知道是否有解决方案。我已经参考了以下链接,但没有帮助,link

我的代码

df_streaming_data= df_streaming_data.withColumn('timestamp_alt',df_streaming_data.timestamp_alt.cast('timestamp'))
w10 = fn.window('timestamp_alt','10 seconds','1 seconds')
df_streaming_data_w10 = df_streaming_data.withWatermark('timestamp_alt','1 minutes') .groupBy(w10).agg(fn.mean('ltp').alias('ma10'))
df_streaming_data_w10 = df_streaming_data_w10.select(df_streaming_data_w10.window.start.cast('timestamp').alias("startma10"),df_streaming_data_w10.window.end.cast('timestamp').alias("endma10"),"ma10")
w12 = fn.window('timestamp_alt','12 seconds','1 seconds')
df_streaming_data_w12 = df_streaming_data.withWatermark('timestamp_alt','1 minutes') .groupBy(w12).agg(fn.mean('ltp').alias('ma12'))
df_streaming_data_w12 = df_streaming_data_w12.select(df_streaming_data_w12.window.start.cast('timestamp').alias("startma12"),df_streaming_data_w12.window.end.cast('timestamp').alias("endma12"),"ma12")
#the code above this line works very well. So we have two moving average data frames ready.
#the following code where we have the join is giving the issue.
df_streaming_filtered = df_streaming_data_w10.join(df_streaming_data_w12,expr("""endma12 = endma10"""))
query = df_streaming_filtered.writeStream.outputMode("append").format("console") .option("truncate",0).start()

在python 3.7中使用Spark 2.3.x。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)