问题描述
我们的Flink作业遇到一个非常难以观察的问题。
工作相当简单,它是:
- 使用Flink Kinesis连接器从Kinesis读取消息
- 键入消息的密钥并将其分发给约30个不同的CEP运算符,以及几个自定义WindowFunctions
- 从CEP / Windows发出的消息被转发到SinkFunction,后者将消息写入SQS
我们正在运行Flink 1.10.1 Fargate,使用2个具有4vcpus / 8GB的容器,我们使用具有以下配置的RocksDB状态后端:
state.backend: rocksdb
state.backend.async: true
state.backend.incremental: false
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.files.open: 130048
该作业的并行度为8。
当作业从冷启动时,它将使用很少的cpu,并且检查点将在2秒内完成。随着时间的推移,检查点的大小会增加,但时间仍然是非常合理的几秒钟:
这期间,由于某些原因,我们可以观察到TaskManager的cpu使用率逐渐增加:
最终,检查点时间将开始达到几分钟,然后开始反复超时(10分钟)。目前:
- 检查点大小(完成后)约为60MB
- cpu使用率很高,但不是100%(通常为60-80%左右)
- 查看进行中的检查点,通常95%+的操作员会在30秒内完成检查点,但是极少数操作员会坚持并且永远不会完成。 SQS接收器将始终包含在此接收器上,但是
SinkFunction
并不丰富且没有状态。 - 在这些操作员上使用背压监视器会报告背压较高
- 由于检查点比例阈值失败,足够的检查点未能触发作业失败
- 检查点最终会开始成功,但是永远不要回落到最初的5-10秒(状态大小更像是30MB与60MB之比)
我们真的不知如何调试它。与您在此处的某些问题中看到的那种状态相比,我们的状态看起来很小。我们的交易量也很低,我们经常低于100条记录/秒。
我们非常感谢在调试方面可以提供的投入。
谢谢
解决方法
几点:
状态随着时间的推移逐渐增长并不罕见。也许您的密钥空间正在增长,并且您正在为每个密钥保留一些状态。如果您要依靠状态TTL来使过期状态失效,则可能未以允许其尽快清除过期状态的方式对其进行配置。不经意间创建CEP模式也相对容易,因为CEP模式需要长时间保持某种状态才能排除某些可能的匹配。
下一步是确定背压的原因。最常见的原因是工作没有足够的资源。随着受管理用户(例如)数量的增加,随着时间的推移,大多数作业逐渐需要更多的资源。例如,您可能需要增加并行度,或为实例提供更多的内存,或增加接收器的容量(或网络到接收器的速度),或为RocksDB提供更快的磁盘。 / p>
除了准备不足之外,引起背压的其他原因还包括
- 阻止i / o是通过用户功能进行的
- 大量计时器同时触发
- 不同来源之间的事件时间偏差导致大量状态被缓冲
- 数据偏斜(热键)使一个子任务或插槽不堪重负
- 长时间的GC暂停
- 争用关键资源(例如,使用NAS作为RocksDB的本地磁盘)
启用RocksDB native metrics可能会提供一些见识。
,将此属性添加到您的配置中:
state.backend.rocksdb.checkpoint.transfer.thread.num: {threadNumberAccordingYourProjectSize}
如果不添加,则为1(默认值)