问题描述
如何每5分钟触发一次并获取最近1小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录。我的理由是:
-
读取流,
-
根据时间戳列过滤最近1小时的数据,并且
-
使用
forEachbatch
进行写入/打印。还有 -
为它添加水印,以免保留所有过去的数据。
spark. readStream.format("delta").table("xxx") .withWatermark("ts","60 minutes") .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes")) .writeStream .format("console") .trigger(Trigger.ProcessingTime("5 minutes")) .foreachBatch{ (batchDF: DataFrame,batchId: Long) => batchDF.collect().foreach(println) } .start()
还是我必须使用窗口?但是如果我使用Window并且不想分组,我似乎无法摆脱GroupBy
。
spark.
readStream.format("delta").table("xxx")
.withWatermark("ts","1 hour")
.groupBy(window($"ts","1 hour"))
.count()
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes"))
.foreachBatch{ (batchDF: DataFrame,batchId: Long) =>
print("...entering foreachBatch...\n")
batchDF.collect().foreach(println)
}
.start()
解决方法
如果要在自己的计算机中调度处理,则应该使用外部调度程序(cron等)或API java.util.Timer,而不是每隔5分钟使用Spark Streaming执行一次Spark代码。代码
为什么不应该通过火花流来安排火花代码执行
如果您使用Spark Streaming来调度代码,则会遇到两个问题。
第一个问题是,火花流仅处理一次数据。因此,每5分钟仅加载一次新记录。您可以考虑使用窗口函数来绕过此问题,而使用collect_list或user defined aggregate function来检索聚合的行列表,但是您将遇到第二个问题。
第二个问题,尽管将每5分钟触发一次处理,但是foreachBatch
中的函数仅在有新记录要处理时才会执行。在两次执行之间的5分钟间隔内没有新记录,则什么也没有发生。
总而言之,火花流并非旨在安排在特定时间间隔执行火花代码。
使用java.util.Timer的解决方案
因此,您应该使用调度程序,而不是使用诸如外部流(如cron,oozie,airflow等)或在您的代码中使用调度程序,而不是使用火花流
如果需要在代码中执行此操作,则可以如下使用java.util.Timer:
import org.apache.spark.sql.functions.{current_timestamp,expr}
import spark.implicits._
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run(): Unit = {
spark.read.format("delta").table("xxx")
.filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
.collect()
.foreach(println)
}
}
t.schedule(task,5*60*1000L,5*60*1000L) // 5 minutes
task.run()