Flink压背机制

开胃菜

在讲解FlinkcheckPoint背压机制之前,我们先来看下checkpoint背压的相关基础,有助于后面的理解。

作为用户,我们写好Flink的程序,上管理平台提交,Flink就跑起来了(只要程序代码没有问题),细节对用户都是屏蔽的。

545a40ba15ac43e6955a433fd8625516.png

 

实际上大致的流程是这样的:

  1. Flink会根据我们所写代码,会生成一个StreamGraph的图出来,来代表我们所写程序的拓扑结构。

  2. 然后在提交的之前会将StreamGraph这个图优化一把(可以合并的任务进行合并),变成JobGraph

  3. JobGraph提交给JobManager

  4. JobManager收到之后JobGraph之后会根据JobGraph生成ExecutionGraphExecutionGraph 是 JobGraph 的并行化版本)

  5. TaskManager接收到任务之后会将ExecutionGraph生成为真正的物理执行图

4188ec3188a84a81ba905633c6e520a7.png

 

可以看到物理执行图真正运行在TaskManagerTransformSink之间都会有ResultPartitionInputGate这俩个组件,ResultPartition用来发送数据,而InputGate用来接收数据。

31e27d79378d4e8c8e349424484fe2de.png

 

屏蔽掉这些Graph,可以发现Flink的架构是:Client->JobManager->TaskManager

99ffb12ea8f649fa86cd00b2e3de05ff.png

 

从名字就可以看出,JobManager是干「管理」,而TaskManager是真正干活的。回到我们今天的主题checkpoint就是由JobManager发出。

9b250062c0774ddaa12896247b2b5cd3.png

 

Flink本身就是有状态的,Flink可以让你选择执行过程中的数据保存在哪里,目前有三个地方,在Flink的角度称作State Backends

  • MemoryStateBackend(内存)

  • FsstateBackend(文件系统,一般是HSFS)

  • RocksDBStateBackend(RocksDB数据库

同样的,checkpoint信息也是保存在State Backends

28236b4b171c44108f5eaee23e5066c8.png

 

耗子屎

最近在Storm迁移Flink的时候遇到个问题,我来简单描述一下背景。

我们从各个数据源从清洗出数据,借助Flink清洗,组装成一个宽模型,最后交由kylin做近实时数据统计和展示,供运营实时查看。

64f868f111804522b04cda6c59d09d15.png

 

迁移的过程中,发现订单的topic消费延迟了好久,初步怀疑是因为订单上游的并发度不够所影响的,所以调整了两端的并行度重新发布一把。

发布的过程中,系统起来以后,再去看topic 消费延迟的监控,就懵逼了。什么?怎么这么久了啊?丝毫没有降下去的意思。

这时候只能找组内的大神去寻求帮忙了,他排查一番后表示:这checkpoint一直没做上,都堵住了,重新发布的时候只会在上一次checkpoint开始,由于checkpoint长时间没完成掉,所以重新发布数据量会很大。这没啥好办法了,只能在这个堵住的环节下扔掉吧,估计是业务逻辑出了问题。

画外音:接收到订单的数据,会去溯源点击,判断该订单从哪个业务来,经过了哪些的业务,最终是哪块业务致使该订单成交。

f73ff07553b3401b826246aad4a511ee.png

 

画外音:外部真正使用时,依赖「订单结果HBase」数据

afb70d1555f84aeebfdf629c2cad06cd.png

 

我们认为点击的数据有可能会比订单的数据处理要慢一会,所以找不到的数据会间隔一段时间轮询,又因为Flink提供State「状态」 和checkpoint机制,我们把找不到的数据放入ListState按一定的时间轮询就好了(即便系统由于重启或其他原因挂了,也不会把数据丢了)。

理论上只要没问题,这套方案是可行的。但现在结果告诉我们:订单数据报来了以后,一小批量数据一直在「订单结果HBase」没找到数据,就放置到ListState上,然后来一条数据就去遍历ListState。导致的后果就是:

  • 数据消费不过来,形成反压

  • checkpoint一直没成功

当时处理的方式就是把ListState清空掉,暂时丢掉这一部分的数据,让数据追上进度。

后来排查后发现是上游在消息报字段上做了「手脚」,解析失败导致点击丢失,造成这一连锁的后果。

排查问题的关键是理解Flink反压checkpoint的原理是什么样的,下面我来讲述一下。

反压

反压backpressure是流式计算中很常见的问题。它意味着数据管道中某个节点成为瓶颈,处理速率跟不上「上游」发送数据的速率,上游需要进行限速

e389268ae11e4a0189053ce09eec5fe5.png

 

上面的图代表了是反压极简的状态,说白了就是:下游处理不过来了,上游得慢点,要堵了!

最令人好奇的是:“下游是怎么通知上游要发慢点的呢?

在前面Flink的基础知识讲解,我们可以看到ResultPartition用来发送数据,InputGate用来接收数据。

f24a2bf11d994150a9d1e83cb6fc4522.png

 

Flink一个TaskManager内部读写数据的时候,会有一个BufferPool(缓冲池)供该TaskManager读写使用(一个TaskManager共用一个BufferPool),每个读写ResultPartition/InputGate都会去申请自己的LocalBuffer

45cc67a72d9f42028e0a371ff8116daf.png

 

以上图为例,假设下游处理不过来,那InputGateLocalBuffer是不是被填满了?填满了以后,ResultPartition是不是没办法往InputGate发了?而ResultPartition没法发的话,它自己本身的LocalBuffer 也迟早被填满,那是不是依照这个逻辑,一直到Source就不会拉数据了...

5b6160c2a5fa4881aeb5851f97d37d26.png

 

这个过程就犹如InputGate/ResultPartition都开了自己的有界阻塞队列,反正“我”就只能处理这么多,往我这里发,我满了就堵住呗,形成连锁反应一直堵到源头上...

上面是只有一个TaskManager的情况下的反压,那多个TaskManager呢?(毕竟我们很多时候都是有多个TaskManager在为我们工作的)

我们再看回Flink通信的总体数据流向架构图:

8ffc6e8a7c38461e80848d8cf7b045a9.png

 

从图上可以清洗地发现:远程通信用的Netty,底层是TCP Socket来实现的。

所以,从宏观的角度看,多个TaskManager只不过多了两个Buffer(缓冲区)。

按照上面的思路,只要InputGateLocalBuffer被打满,Netty Buffer也迟早被打满,而Socket Buffer同样迟早也会被打满(TCP 本身就带有流量控制),再反馈到ResultPartition上,数据又又又发不出去了...导致整条数据链路都存在反压的现象。

现在问题又来了,一个TaskManagertask可是有很多的,它们都共用一个TCP Buffer/Buffer Pool,那只要其中一个task的链路存在问题,那不导致整个TaskManager跟着遭殃?

f991ec5886fc40768654170f8fbe2b1c.png

 

Flink 1.5版本之前,确实会有这个问题。而在Flink 1.5版本之后则引入了credit机制。

从上面我们看到的Flink所实现的反压,宏观上就是直接依赖各个Buffer是否满了,如果满了则无法写入/读取导致连锁反应,直至Source端。

credit机制,实际上可以简单理解为以「更细粒度」去做流量控制:每次InputGate会告诉ResultPartition自己还有多少的空闲量可以接收,让ResultPartition看着发。如果InputGate告诉ResultPartition已经没有空闲量了,那ResultPartition就不发了。

0bb61c75337f4d829daef8aac576382a.png

 

那实际上是怎么实现的呢?撸源码!

在撸源码之前,我们再来看看下面物理执行图:实际上InPutGate下是InputChannelResultPartition下是ResultSubpartition(这些在源码中都有体现)。

cabc9c87a87342fc80b07a323ce85370.png

 

InputGate(接收端处理反压)

我们先从接收端看起吧。Flink接收数据的方法org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput

随后定位到处理反压的逻辑:

final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();

进去getNextNonBlocked()方法看(选择的是BarrierBuffer实现):

0bd56de79913498ab0a4c971747879a7.png

 

我们就直接看null的情况,看下从初始化阶段开始是怎么搞的,进去getNextBufferOrEvent()

进去方法里面看到两个比较重要的调用

83cb8818d6e840d7ad5b1d8fd73a83be.png

 

requestPartitions();

result = currentChannel.getNextBuffer();

先从requestPartitions()看起吧,发现里边套了一层(从InputChannel获取subPartition):

6429bb6d07e64f0cbda6a45ecfd925fa.png

 

于是再进requestSubpartition()(看RemoteInputChannel的实现吧)

在这里看起来就是创建Client端,然后接收上游发送过来的数据:

60a5bcc838034346b0ed63298199974e.png

 

先看看client端的创建姿势吧,进createPartitionRequestClient()方法看看(我们看Netty的实现)。

点了两层,我们会进到createPartitionRequestClient()方法,看源码注释就可以清晰发现,这会创建TCP连接并且创建出Client供我们使用

9466bddc28a649f6acd40d7c844f7e95.png

 

我们还是看null的情况,于是定位到这里:

9817891d6f4843ad9f2b2e83b2be9bd2.png

 

进去connect()方法看看:

d39096a44cbb4db2b37b3c8d83d06fdc.png

 

我们就看看具体生成逻辑的实现吧,所以进到getClientChannelHandlers

意外发现源码还有个通信简要流程图给我们看(哈哈哈):

e67a9482870547708f26dd6c91b6b7b1.png

 

好了,来看看getClientChannelHandlers方法吧,这个方法不长,主要判断了下要生成client是否开启creditBased机制:

public ChannelHandler[] getClientChannelHandlers() {
  NetworkClientHandler networkClientHandler =
   creditBasedEnabled ? new CreditBasedPartitionRequestClientHandler() :
    new PartitionRequestClientHandler();
  return new ChannelHandler[] {
   messageEncoder,
   new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
   networkClientHandler};
 }

于是我们的networkClientHandler实例是CreditBasedPartitionRequestClientHandler

到这里,我们暂且就认为Client端已经生成完了,再退回去getNextBufferOrEvent()这个方法requestPartitions()方法生成接收数据的Client端,具体的实例是CreditBasedPartitionRequestClientHandler

8e2b5cc16f194b8091f7f40b455b9ef0.png

 

下面我们进getNextBuffer()看看接收数据具体是怎么处理的:

c6953896b505457e988c9b6f41543238.png

 

拿到数据后,就会开始执行我们用户代码调用process方法了(这里我们先不看了)。还是回到反压的逻辑上,我们好像还没看到反压的逻辑在哪里。重点就是receivedBuffers这里,是谁塞进去的呢?

于是我们回看到Client具体的实例CreditBasedPartitionRequestClientHandler,打开方法列表一看,感觉就是ChannelRead()没错了:

3a7baceb42614133b02ef92431cbd101.png

 

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  try {
   decodeMsg(msg);
  } catch (Throwable t) {
   notifyAllChannelsOfErrorAndClose(t);
  }
 }

跟着decodeMsg继续往下走吧:

e79c731caf6f401ea2a99cc309a25dba.png

 

继续下到decodeBufferOrEvent()

2785f646cf7943368708819e1af1b058.png

 

继续下到onBuffer

ffb9b3d115674a678e657f6157158f61.png

 

所以我们往onSenderBacklog上看看:

37a3a856f2a5462fb28262e322501e8f.png

 

最后调用notifyCreditAvailableCredit往上游发送:

public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
  ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
 }

2b0242041d444b3482d3f4d674c03700.png

 

最后再画张图来理解一把(关键链路):

8f55106757ad464199cbd8945a307399.png

 

ResultPartition(发送端处理反压)

发送端我们从org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager开始看起

00b7073d622e44cf99e6b293ea2181be.png

 

于是我们进去看fromConfiguration()

f062e8592dad4f069102d47d979c1688.png

 

进去start()去看,随后进入connectionManager.start()(还是看Netty的实例):

f8c5d688718947d1a73c034aed3eddbf.png


进去看service.init()方法做了什么(又看到熟悉的身影): 

4c75fa2af4b340beb5497a3cd3ab664e.png

 

好了,我们再进去getServerChannelHandlers()看看吧:

cad67ccc42f446da8648a21e9a66d806.png

 

有了上面经验的我们,直接进去看看它的方法,没错,又是channnelRead,只是这次是channelRead0

4c5f0ff5cf75489a88d003f57b09eaf4.png

 

ok,我们进去addCredit()看看:

9b4a4b690ecb4badbfcd8479eac13205.png

 

reader.addCredit(credit)只是更新了下数量

public void addCredit(int creditDeltas) {
  numCreditsAvailable += creditDeltas;
 }

重点我们看下enqueueAvailableReader() 方法,而enqueueAvailableReader()的重点就是判断Credit是否足够发送

9fe52a51b24c495088911f40ce488f05.png

 

isAvailable的实现也很简单,就是判断Credit是否大于0且有真实数据可发

5c7d40f09a0540fcab626b63e3b3741e.png

 

writeAndFlushNextMessageIfPossible实际上就是往下游发送数据:

179983753692429cae8d68e33658a7c4.png

 

拿数据的时候会判断Credit是否足够,不足够抛异常:

dc03a401b3dd4a8c8f5f3bf5377ba6da.png

 

再画张图来简单理解一下:

cf0f16083b92407c8748dea18e7739fb.png

 

背压总结

「下游」的处理速度跟不上「上游」的发送速度,从而降低了处理速度,看似是很美好的(毕竟看起来就是帮助我们限流了)。

但在Flink里,背压再加上Checkponit机制,很有可能导致State状态一直变大,拖慢完成checkpoint速度甚至超时失败。

checkpoint处理速度延迟时,会加剧背压的情况(很可能大多数时间都在处理checkpoint了)。

checkpoint做不上时,意味着重启Flink应用就会从上一次完成checkpoint重新执行(...

举个我真实遇到的例子:

我有一个Flink任务,我只给了它一台TaskManager去执行任务,在更新DB的时候发现会有并发的问题。

只有一台TaskManager定位问题很简单,稍微定位了下判断:我更新DB的Sink 并行度调高了。

如果Sink的并行度设置为1,那肯定没有并发的问题,但这样处理起来太慢了。

于是我就在Sink之前根据userId进行keyBy(相同的userId都由同一个Thread处理,那这样就没并发的问题了)

5b316980bc6e4258beb90a13adbc20e4.png

 

看似很美好,但userId存在热点数据的问题,导致下游数据处理形成反压。原本一次checkpoint执行只需要30~40ms反压后一次checkpoint需要2min+

checkpoint执行间隔相对频繁(6s/次),执行时间2min+,最终导致数据一直处理不过来,整条链路的消费速度从原来的3000qps到背压后的300qps,一直堵住(程序没问题,就是处理速度大大下降,影响到数据的最终产出)。

最后

本来想着这篇文章把反压和Checkpoint都一起写了,但写着写着发现有点长了,那checkpoint下一篇吧。

相信我,只要你用到Flink,迟早会遇到这种问题的,现在可能有的同学还没看懂,没关系,先点个赞

相关文章

显卡天梯图2024最新版,显卡是电脑进行图形处理的重要设备,...
初始化电脑时出现问题怎么办,可以使用win系统的安装介质,连...
todesk远程开机怎么设置,两台电脑要在同一局域网内,然后需...
油猴谷歌插件怎么安装,可以通过谷歌应用商店进行安装,需要...
虚拟内存这个名词想必很多人都听说过,我们在使用电脑的时候...