如何每隔5分钟获取最近1小时的数据而不进行分组? 为什么不应该通过火花流来安排火花代码执行使用java.util.Timer的解决方案

问题描述

如何每5分钟触发一次并获取最近1小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录。我的理由是:

  1. 读取流,

  2. 根据时间戳列过滤最近1小时的数据,并且

  3. 使用forEachbatch进行写入/打印。还有

  4. 为它添加水印,以免保留所有过去的数据。

     spark.
     readStream.format("delta").table("xxx")
       .withWatermark("ts","60 minutes")
       .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes"))
     .writeStream
       .format("console")
       .trigger(Trigger.ProcessingTime("5 minutes"))
       .foreachBatch{ (batchDF: DataFrame,batchId: Long) =>  batchDF.collect().foreach(println)
            }
     .start()
    

还是我必须使用窗口?但是如果我使用Window并且不想分组,我似乎无法摆脱GroupBy

spark.
  readStream.format("delta").table("xxx")
    .withWatermark("ts","1 hour")
    .groupBy(window($"ts","1 hour"))
    .count()
 .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("5 minutes"))
    .foreachBatch{ (batchDF: DataFrame,batchId: Long) => 
         print("...entering foreachBatch...\n")
         batchDF.collect().foreach(println)
         }
 .start()

解决方法

如果要在自己的计算机中调度处理,则应该使用外部调度程序(cron等)或API java.util.Timer,而不是每隔5分钟使用Spark Streaming执行一次Spark代码。代码

为什么不应该通过火花流来安排火花代码执行

如果您使用Spark Streaming来调度代码,则会遇到两个问题。

第一个问题是,火花流仅处理一次数据。因此,每5分钟仅加载一次新记录。您可以考虑使用窗口函数来绕过此问题,而使用collect_listuser defined aggregate function来检索聚合的行列表,但是您将遇到第二个问题。

第二个问题,尽管将每5分钟触发一次处理,但是foreachBatch中的函数仅在有新记录要处理时才会执行。在两次执行之间的5分钟间隔内没有新记录,则什么也没有发生。

总而言之,火花流并非旨在安排在特定时间间隔执行火花代码。

使用java.util.Timer的解决方案

因此,您应该使用调度程序,而不是使用诸如外部流(如cronoozieairflow等)或在您的代码中使用调度程序,而不是使用火花流

如果需要在代码中执行此操作,则可以如下使用java.util.Timer

import org.apache.spark.sql.functions.{current_timestamp,expr}
import spark.implicits._

val t = new java.util.Timer()
val task = new java.util.TimerTask {
  def run(): Unit = {
    spark.read.format("delta").table("xxx")
      .filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
      .collect()
      .foreach(println)
  }
}
t.schedule(task,5*60*1000L,5*60*1000L) // 5 minutes
task.run()

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...