Flink window一翻译官方文档

最近准备用flink对之前项目进行重构,这是一个有挑战(但我很喜欢)的工作。几个月过去了,flink社区比起我做技术调研那阵发生了很多变化(包括blink的版本回推),我这边的版本也由1.4->1.7.2。现在网上有很多大方向的解析(阿里的几次直播),也有大神对框架的深入解析。我准备实际使用中mark一些关键的知识点/api。以窗口是flink一个重要的概念,flink提供了很多种窗口的使用方式,以下为窗口相关文档的第一部分,包含目录窗口功能中窗口折叠函数前的内容。


目录

 


视窗

Windows是处理无限流的核心。Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。本文档重点介绍如何在Flink中执行窗口化以及程序员如何从其提供的功能中获得最大益处。

窗口Flink程序的一般结构如下所示。第一个片段指的是键控流,而第二个片段指的是非**键控流。正如人们所看到的,唯一的区别是keyBy(...)呼吁密钥流和window(...)成为windowAll(...)非键控流。这也将作为页面其余部分的路线图。

键控Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

非键控Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

在上面,方括号([...])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,以便最适合您的需求。

窗口生命周期

简而言之,只要应该属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定 时,窗口将被完全删除allowed lateness(请参阅允许的延迟))。Flink保证仅删除基于时间的窗口而不是其他类型,例如全局窗口(请参阅窗口分配器)。例如,使用基于事件时间的窗口策略,每5分钟创建一个非重叠(或翻滚)的窗口,并允许延迟1分钟,Flink将创建一个新窗口,用于间隔12:0012:05当具有落入此间隔的时间戳的第一个元素到达时,它将在水印通过12:06 时间戳时将其删除。

此外,每个窗口将具有Trigger(参见触发器)和一个函数(ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction)(见窗口功能)连接到它。该函数将包含要应用于窗口内容的计算,而Trigger指定窗口被认为准备好应用该函数的条件。触发策略可能类似于“当窗口中的元素数量大于4”时,或“当水印通过窗口结束时”。触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的元素,而不是窗口元数据。这意味着仍然可以将新数据添加到该窗口。

除了上述之外,您还可以指定一个Evictor(参见Evictors),它可以在触发器触发后以及应用函数之前和/或之后从窗口中删除元素。

在下文中,我们将详细介绍上述每个组件。我们从上面代码段中的必需部分开始(请参阅键控与非键控窗口窗口分配器窗口函数),然后再转到可选部分。

键控与非键控窗口

要指定的第一件事是您的流是否应该键入。必须在定义窗口之前完成此操作。使用the keyBy(...)将您的无限流分成逻辑键控流。如果keyBy(...)未调用,则表示您的流未加密。

对于键控流,可以将传入事件的任何属性用作键(此处有更多详细信息)。拥有键控流将允许您的窗口计算由多个任务并行执行,因为每个逻辑键控流可以独立于其余任务进行处理。引用相同密钥的所有元素将被发送到同一个并行任务。

在非键控流的情况下,您的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,并行性为1。

窗口分配器

指定您的流是否已键入后,下一步是定义一个窗口分配器。窗口分配器定义如何将元素分配给窗口。这是通过WindowAssignerwindow(...)(对于键控流)或windowAll()(对于非键控流)调用中指定您的选择来完成的。

A WindowAssigner负责将每个传入元素分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例,即翻滚窗口滑动窗口会话窗口全局窗口。您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配元素,这可以是处理时间或事件时间。请查看我们关于活动时间的部分,了解处理时间和事件时间之间的差异以及时间戳和水印的生成方式。

基于时间的窗口具有开始时间戳(包括)和结束时间戳(不包括),它们一起描述窗口的大小。在代码中,Flink在使用TimeWindow基于时间的窗口时使用,该窗口具有查询开始和结束时间戳的方法maxTimestamp(),以及返回给定窗口的最大允许时间戳的附加方法。

在下文中,我们将展示Flink的预定义窗口分配器如何工作以及如何在DataStream程序中使用它们。下图显示了每个分配者的工作情况。紫色圆圈表示流的元素,这些元素由某个键(在这种情况下是用户1用户2用户3)划分。x轴显示时间的进度。

翻折窗口

翻滚窗口分配器的每个元素分配给指定的窗口的窗口大小。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示。

以下代码段显示了如何使用翻滚窗口。

DataStream<T> input = ...;
​
// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);
​
// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);
​
// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

时间间隔可以通过使用一个指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等等。

如上一个示例所示,翻滚窗口分配器还采用可选offset 参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时翻滚窗户划时代对齐,这是你会得到如窗户 1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量Time.hours(-8)

滑动窗口

滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。

例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。

以下代码段显示了如何使用滑动窗口。

DataStream<T> input = ...;
​
// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);
​
// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);
​
// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

时间间隔可以通过使用一个指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等等。

如上一个示例所示,滑动窗口分配器还采用可选offset参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时窗户半小时滑动与时代一致,那就是你会得到如窗户 1:00:00.000 - 1:59:59.9991:30:00.000 - 2:29:59.999等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.9991:45:00.000 - 2:44:59.999等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量Time.hours(-8)

会话窗口

会话窗口中按活动会话分配器组中的元素。会话窗口不重叠,没有固定的开始和结束时间,与翻滚窗口滑动窗口相反。相反,当会话窗口在一段时间内没有接收到元素时,当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙会话间隙提取器功能,该功能定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续元素将分配给新的会话窗口。

以下代码段显示了如何使用会话窗口。

DataStream<T> input = ...;
​
// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);
​
// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

静态间隙可以通过使用中的一个来指定Time.milliseconds(x)Time.seconds(x)Time.minutes(x),等。

通过实现SessionWindowTimeGapExtractor接口指定动态间隙。

注意由于会话窗口没有固定的开始和结束,因此它们的评估方式与翻滚和滑动窗口不同。在内部,会话窗口操作员为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一起。为了可合并的,会话窗口操作者需要一个合并触发器和一个合并 的窗函数,如ReduceFunctionAggregateFunction,或ProcessWindowFunctionFoldFunction不能合并。)

全局窗口

一个全球性的窗口分配器分配使用相同的密钥相同的单个的所有元素全局窗口。此窗口方案仅在您还指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束。

以下代码段显示了如何使用全局窗口。

DataStream<T> input = ...;
​
input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

窗口功能

定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是窗口函数的职责,窗口函数用于在系统确定窗口准备好进行处理后处理每个(可能是键控的)窗口的元素(请参阅Flink如何确定窗口准备就绪的触发器)。

的窗函数可以是一个ReduceFunctionAggregateFunctionFoldFunctionProcessWindowFunction。前两个可以更有效地执行(参见State Size部分),因为Flink可以在每个窗口到达时递增地聚合它们的元素。A ProcessWindowFunction获取Iterable窗口中包含的所有元素以及有关元素所属窗口的其他元信息。

具有a的窗口转换ProcessWindowFunction不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的所有元素。这可以通过组合来减轻ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction以获得两个窗口元件的增量聚合并且该附加元数据窗口 ProcessWindowFunction接收。我们将查看每个变体的示例。

还原函数

A ReduceFunction指定如何组合输入中的两个元素以生成相同类型的输出元素。Flink使用a ReduceFunction来递增地聚合窗口的元素。

ReduceFunction可以像这样定义和使用A :

DataStream<Tuple2<String, Long>> input = ...;
​
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
上面的示例总结了窗口中所有元素的元组的第二个字段。

聚合函数

An AggregateFunction是一个通用版本,ReduceFunction它有三种类型:输入类型(IN),累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中元素的类型,并且AggregateFunction具有将一个输入元素添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于OUT从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。

与之相同ReduceFunction,Flink将在窗口到达时递增地聚合窗口的输入元素。

一个AggregateFunction可以被定义并这样使用:

  • Java示例

  • /**
     * The accumulator is used to keep a running sum and a count. The {@code getResult} method
     * computes the average.
     */
    private static class AverageAggregate
        implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
      @Override
      public Tuple2<Long, Long> createAccumulator() {
        return new Tuple2<>(0L, 0L);
      }
    ​
      @Override
      public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
        return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
      }
    ​
      @Override
      public Double getResult(Tuple2<Long, Long> accumulator) {
        return ((double) accumulator.f0) / accumulator.f1;
      }
    ​
      @Override
      public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
      }
    }
    ​
    DataStream<Tuple2<String, Long>> input = ...;
    ​
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .aggregate(new AverageAggregate());

    上面的示例计算窗口中元素的第二个字段的平均值。

    折叠函数

    A FoldFunction指定窗口的输入元素如何与输出类型的元素组合。所述FoldFunction递增称为该被添加到窗口和电流输出值的每个元素。第一个元素与输出类型的预定义初始值组合。

    FoldFunction可以像这样定义和使用A :

  • Java示例

DataStream<Tuple2<String, Long>> input = ...;
​
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         return acc + value.f1;
       }
    });

上面的示例将所有输入Long值附加到最初为空String

注意 fold()不能与会话窗口或其他可合并窗口一起使用。


原文连接https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...