如何每隔5分钟获取最近1小时的数据而不进行分组? 为什么不应该通过火花流来安排火花代码执行使用java.util.Timer的解决方案

问题描述

如何每5分钟触发一次并获取最近1小时的数据?我想出了这一点,但似乎并没有给我最后1个小时的所有记录。我的理由是:

@H_404_2@
  • 读取流,

  • 根据时间戳列过滤最近1小时的数据,并且

  • 使用forEachbatch进行写入/打印。还有

  • 为它添加水印,以免保留所有过去的数据。

     spark.
     readStream.format("delta").table("xxx")
       .withWatermark("ts","60 minutes")
       .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes"))
     .writeStream
       .format("console")
       .trigger(Trigger.ProcessingTime("5 minutes"))
       .foreachBatch{ (batchDF: DataFrame,batchId: Long) =>  batchDF.collect().foreach(println)
            }
     .start()
    
  • 还是我必须使用窗口?但是如果我使用Window并且不想分组,我似乎无法摆脱GroupBy

    spark.
      readStream.format("delta").table("xxx")
        .withWatermark("ts","1 hour")
        .groupBy(window($"ts","1 hour"))
        .count()
     .writeStream
        .format("console")
        .trigger(Trigger.ProcessingTime("5 minutes"))
        .foreachBatch{ (batchDF: DataFrame,batchId: Long) => 
             print("...entering foreachBatch...\n")
             batchDF.collect().foreach(println)
             }
     .start()
    

    解决方法

    如果要在自己的计算机中调度处理,则应该使用外部调度程序(cron等)或API java.util.Timer,而不是每隔5分钟使用Spark Streaming执行一次Spark代码。代码

    为什么不应该通过火花流来安排火花代码执行

    如果您使用Spark Streaming来调度代码,则会遇到两个问题。

    第一个问题是,火花流仅处理一次数据。因此,每5分钟仅加载一次新记录。您可以考虑使用窗口函数来绕过此问题,而使用collect_listuser defined aggregate function来检索聚合的行列表,但是您将遇到第二个问题。

    第二个问题,尽管将每5分钟触发一次处理,但是foreachBatch中的函数仅在有新记录要处理时才会执行。在两次执行之间的5分钟间隔内没有新记录,则什么也没有发生。

    总而言之,火花流并非旨在安排在特定时间间隔执行火花代码。

    使用java.util.Timer的解决方案

    因此,您应该使用调度程序,而不是使用诸如外部流(如cronoozieairflow等)或在您的代码中使用调度程序,而不是使用火花流

    如果需要在代码中执行此操作,则可以如下使用java.util.Timer

    import org.apache.spark.sql.functions.{current_timestamp,expr}
    import spark.implicits._
    
    val t = new java.util.Timer()
    val task = new java.util.TimerTask {
      def run(): Unit = {
        spark.read.format("delta").table("xxx")
          .filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
          .collect()
          .foreach(println)
      }
    }
    t.schedule(task,5*60*1000L,5*60*1000L) // 5 minutes
    task.run()