动态定义apache光束窗口的大小

问题描述

我正在从PubSub中读取事件,目标是将事件分组到窗口中。我想使每个窗口的结尾与每小时的0、15、30和45分钟一致。
由于这是一项流工作,因此可以随时启动它,我想找到一种方法来使第一个窗口的大小与下一个窗口的大小对齐。
这将是流:

  1. 启动工作
  2. 定义此刻与下一刻钟之间的剩余时间为window_size
  3. 从第一个窗口的结尾开始,设置window_size = int(15*60)(秒)。

例如:

  1. 启动工作
  2. 现在是11:18,所以请修复window_size = (11:30-11:18).seconds
  3. 当第一个窗口结束时,设置window_size = int(15*60)(秒)

在Google提供的示例之一中,使用窗口的管道定义如下,其中window_size用户作为输入传递的参数:

def expand(self,pcoll):
  return (
          pcoll
          | "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
          | "Add Key" >> beam.Map(lambda elem: (None,elem))
          | "Groupby" >> beam.GroupByKey()
          | "Abandon Key" >> beam.MapTuple(lambda _,val: val)
  )

解决方法

您的用例非常适合Beam!

首先,有一个基本的概念性问题需要解决:

  • 用于窗口化的元素上的时间戳称为“事件时间”。它们是数据的一部分,描述了流中何时发生了某些事件。
  • 启动和运行作业的时间称为“处理时间”。它不属于您的数据。

如果您不将两者结合或混淆,您将更加成功。 Windows不会在作业处理时间中“开始”或“结束”。 Windows一直存在。

使用15分钟的FixedWindows即可满足您的需求。每个事件都将与其所属的15分钟间隔相关联。当您启动工作或事件需要处理时,不会对此产生影响。

更新:添加示例进行说明:

假设您在问题中的11:18开始工作,并假设传入的事件大约在同一时间生成。假设出现以下事件,并带有时间戳:

  • A @ 11:01
  • B @ 11:18
  • C @ 11:15
  • D @ 11:31
  • E @ 11:29

元素将分配给窗口,如下所示:

  • [11:00,11:15)中的A
  • B在[11:15,11:30)
  • C在[11:15,11:30)
  • D在[11:30,11:45)
  • E在[11:15,11:30)

请注意,窗口分配与您开始工作,事件何时到达或到达顺序无关。您实际上可以在明天启动它,或在存档数据或顺序不尽相同的数据上重新运行它,结果将是相同的。事件时间窗口基于数据