问题描述
我正在从PubSub中读取事件,目标是将事件分组到窗口中。我想使每个窗口的结尾与每小时的0、15、30和45分钟一致。
由于这是一项流工作,因此可以随时启动它,我想找到一种方法来使第一个窗口的大小与下一个窗口的大小对齐。
这将是流:
例如:
- 启动工作
- 现在是11:18,所以请修复
window_size = (11:30-11:18).seconds
- 当第一个窗口结束时,设置
window_size = int(15*60)
(秒)
在Google提供的示例之一中,使用窗口的管道定义如下,其中window_size
是用户作为输入传递的参数:
def expand(self,pcoll):
return (
pcoll
| "Window into Fixed Intervals" >> beam.WindowInto(window.FixedWindows(self.window_size))
| "Add Key" >> beam.Map(lambda elem: (None,elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Key" >> beam.MapTuple(lambda _,val: val)
)
解决方法
您的用例非常适合Beam!
首先,有一个基本的概念性问题需要解决:
- 用于窗口化的元素上的时间戳称为“事件时间”。它们是数据的一部分,描述了流中何时发生了某些事件。
- 启动和运行作业的时间称为“处理时间”。它不属于您的数据。
如果您不将两者结合或混淆,您将更加成功。 Windows不会在作业处理时间中“开始”或“结束”。 Windows一直存在。
使用15分钟的FixedWindows
即可满足您的需求。每个事件都将与其所属的15分钟间隔相关联。当您启动工作或事件需要处理时,不会对此产生影响。
更新:添加示例进行说明:
假设您在问题中的11:18开始工作,并假设传入的事件大约在同一时间生成。假设出现以下事件,并带有时间戳:
- A @ 11:01
- B @ 11:18
- C @ 11:15
- D @ 11:31
- E @ 11:29
元素将分配给窗口,如下所示:
- [11:00,11:15)中的A
- B在[11:15,11:30)
- C在[11:15,11:30)
- D在[11:30,11:45)
- E在[11:15,11:30)
请注意,窗口分配与您开始工作,事件何时到达或到达顺序无关。您实际上可以在明天启动它,或在存档数据或顺序不尽相同的数据上重新运行它,结果将是相同的。事件时间窗口基于数据。