在Pyspark结构化流中,如何在写入Kafka之前丢弃已经生成的输出?

问题描述

我正在尝试对Kafka源数据进行结构化流传输(Spark 2.4.0),在该源数据上我将读取最新数据并在10分钟的窗口内执行汇总。写入数据时,我正在使用“更新”模式。

例如,数据模式如下:

tx_id,cust_id,product,timestamp

我的目标是找到最近10分钟内购买了3种以上产品的客户。假设 prod 是从kafka读取的数据框,然后 windowed_df 定义为:

windowed_df_1 = prod.groupBy(window("timestamp","10 minutes"),cust_id).count()
windowed_df = windowed_df_1.filter(col("count")>=3)

然后,我将其与配置单元表“ customer_master”中的主数据框合并,以获取cust_name

final_df = windowed_df.join(customer_master,"cust_id")

最后,将此数据帧写入Kafka接收器(或为简单起见,控制台)

query = final_df.writeStream.outputMode("update").format("console").option("truncate",False).trigger(processingTime='2 minutes').start()
query.awaitTermination()

现在,当此代码每2分钟运行一次时,在随后的运行中,我想舍弃所有已经属于我的输出的客户。即使他们再次购买任何产品,我也不希望他们出现在我的输出中。

我可以将流输出临时写在(可能是配置单元表)的某个地方,并为每次执行做一个 anti-join ”吗? 这样,我还可以在配置单元表中保留历史记录。

我还在某个地方读到了可以将输出写入内存接收器的地方,然后使用df.write将其保存在HDFS / Hive中。 但是如果我们终止工作并重新运行该怎么办?在这种情况下,内存表将丢失。

由于我是结构化流媒体的新手,请提供帮助。

**

  • 更新:-

** 我还尝试了下面的代码在Hive表和控制台(或Kafka接收器)中写入输出

def write_to_hive(df,epoch_id):
    df.persist()
    df.write.format("hive").mode("append").saveAsTable("hive_tab_name")
    pass

final_df.writeStream.outputMode("update").format("console").option("truncate",False).start()

final_df.writeStream.outputMode("update").foreachBatch(write_to_hive).start()

但这仅执行第一个动作,即写入控制台。 如果我先写“ foreachBatch”,它将保存到Hive表中,但不会打印到控制台。

我想写两个不同的接收器。请帮忙。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)