Spark结构化流媒体作业问题

问题描述

我正在尝试使用Spark结构化流API来从Azure EventHub流数据,Azure EventHub的消息保留期为4天,如果我连续流数据(即不调用任何Trigger.once,我的流作业就可以正常工作) ()API)。这里的挑战是,我每天只能获得7-8GB的数据,我正在尝试使用Trigger.once()API来实现该作业,以便我可以每天运行一次预定的作业。

当我尝试通过Trigger.once()API运行该作业时,即使EventHub端有可用数据,流作业也会插入4-5个薄荷糖的数据并自动关闭会话。如果我再次触发相同的作业,则该作业将再次插入4-5个薄荷糖的数据并自动关闭会话。

val dataWrite = dtcFinalDF.writeStream.format("JSON").partitionBy("year","month","day").option("checkpointLocation","/mnt/persisted/EventHub_TruckConnectData/TruckConnect_Data_Checkpoint").outputMode(OutputMode.Append).trigger(Trigger.Once()).start("/mnt/persisted/EventHub_TruckConnectData/TruckConnect_Data_Output")

//Process all the available data and close the session

dataWrite.processAllAvailable()
dataWrite.stop()

但是当我手动运行相同的流作业而没有调用Trigger.once()API时,作业会成功将数据插入到blob位置,并且不会间歇性地关闭会话。

dtcFinalDF.writeStream.format("JSON").partitionBy("year","/mnt/truckconnectdatasample/TruckConnect_Data_Checkpoint").outputMode(OutputMode.Append).start("/mnt/truckconnectdatasample/TruckConnect_Data_Output")

我无法在这里破解的一件事是,如果我实现了Trigger.once(),为什么即使在源端(Azure EventHub)上有可用数据,作业也会在4-5分钟后终止。任何帮助将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)