Flink WaterMark原理简述

1.Flink中Time概念

我们知道在分布式环境中 Time 是一个很重要的概念,在 Flink 中 Time 可以分为三种Event-Time,Processing-Time 以及 Ingestion-Time,三者的关系我们可以从下图中得知:

null

  • Event-Time 表示事件发生的时间

  • Processing-Time 则表示处理消息的时间

  • Ingestion-Time 表示进入到系统的时间

在 Flink 中我们可以通过下面的方式进行 Time 类型的设置

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 设置使用 ProcessingTime

以上摘自:

Apache Flink 零基础入门(六):Flink Time & Window 解析

2. Flink中Watermark 概念

我们可以考虑一个这样的例子:某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。

那我们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了所有的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示所有时间戳不大于 t 的数据都已经到来了,未来小于等于t的数据不会再来,因此可以放心地触发和销毁窗口了。下图中给了一个乱序数据流中的 watermark 例子

null

以上摘自:

Apache Flink 零基础入门(六):Flink Time & Window 解析

3. WaterMark的两种生成方式

image-20220704175658895

如上图watermark的生成有两种方式:

  • 实现AssignerWithPeriodicWatermarks接口(周期性水位线)
    按照固定时间间隔生成新的水位线,不管是否有新的消息抵达,水位线提升的时间间隔是由用户设置的,在两次水位线提升间隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

    public abstract class BoundedOutOfOrdernessTimestampExtractor<T>
            implements AssignerWithPeriodicWatermarks<T> {
       //...
       //获取当前的时间戳
        @Override
        public final Watermark getCurrentWatermark() {
            // 这里保证了watermark是单调递增 
            long potentialWM = currentMaxTimestamp - maxOutOfOrderness; //当前最大时间戳 - 允许乱序延迟时间 相当于: watermark = eventTime -t
            if (potentialWM >= lastEmittedWatermark) {
                lastEmittedWatermark = potentialWM;
            }
            return new Watermark(lastEmittedWatermark);
        }
       
        //提取时间戳
        @Override
        public final long extractTimestamp(T element, long previousElementTimestamp) {
            //从元素中提取时间戳
            long timestamp = extractTimestamp(element);
            //如果元素中提取的时间戳大于当前最大的时间戳,则更新
            if (timestamp > currentMaxTimestamp) {
                currentMaxTimestamp = timestamp;
            }
            return timestamp;
        }
    }
    
    

    :周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

  • 实现AssignerWithPunctuatedWatermarks接口(标点性水位线)

    标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

    public class WatermarkOnFlagAssigner 
           implements AssignerWithPunctuatedWatermarks<MyElement> {
         //截取flink源码demo  
         @Override
         public long extractTimestamp(MyElement element, long previousElementTimestamp) {
             //从元素中提取时间戳
              return element.getSequenceTimestamp();
          }
          
          //检查和获取下一个时间戳
         @Override
         public Watermark checkAndGetNextWatermark(MyElement lastElement, long extractedTimestamp) {
              //元素完成处理就产生一个新的watermark
              return lastElement.isEndOfSequence() ? new Watermark(extractedTimestamp) : null;
         }
      }
      }
    

    :数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

4. 迟到事件

​ 虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

  • 重新激活已经关闭的窗口并重新计算以修正结果。
  • 将迟到事件收集起来另外处理。
  • 将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output和Allowed Lateness。

Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

以上部分摘自:

全网最详细Flink之Watermark机制

相关文章

Flink-core小总结1.实时计算和离线计算1.1离线计算离线计算的...
2022年7月26日,Taier1.2版本正式发布!本次版本发布更新功能...
关于Flink相关的概念性东西就不说了,网上都有,官网也很详尽...
最近准备用flink对之前项目进行重构,这是一个有挑战(但我很...
Thispostoriginallyappearedonthe ApacheFlinkblog.Itwasre...
Flink配置文件对于管理员来说,差不多经常调整的就只有conf下...