为什么 apache flink 检查点大小非常大?

问题描述


我有一个简单的 Apache Flink 工作:
**DataSource (Apache Kafka) - Filter - KeyBy - CEP Pattern (with timer) - PatternProcessFucntion - KeyedProcessFunction (*这里我有 ValueState(Boolean) 并在 5 分钟内注册计时器。如果 valueState 不为 null,我将更新 valueState (没有什么可在收集器中发送)并更新计时器。如果 valueState 为空,我将保存为 TRUE,然后在收集器中发送输入事件并设置计时器。当 onTimer 方法准备就绪时,我将清理我的 ValueState*)- Sink (阿帕奇卡夫卡)**。
作业设置:
**检查点间隔:5000ms**
**增量检查点:true**
**语义:恰好一次**
**状态后端:RocksDB**
**并行度:4**
从逻辑上讲,我的工作完美无缺,但我遇到了一些问题。
我对我的集群进行了两个测试(2 个作业管理器和 3 个任务管理器):
**第一次测试:**
我开始工作并连接到一个空的 Apache Kafka 主题,然后我在 Flink WEB UI 中看到 **Checkpointing Statistics:**
1)最新确认 - 触发时间 = 5000 毫秒(就像我的检查点间隔)
2)状态大小 = 每 5 秒间隔 340 kb
3)所有状态已完成(蓝色)。
**第二次测试:**
我开始在 Apache Kafka 主题中发送带有其他键(从“1”到 Integer.MAX_VALUE)的 json 消息。发送速度是:1000 条消息/秒然后我在 Flink WEB UI 中看到 **Checkpointing Statistics:**
1) 最新确认 - 触发时间 = 1 - 6 分钟
**我的问题#1:为什么这个时间在增长?它是坏的还是好的?**
2)国家规模不断扩大。我在 Kafka 中发送了大约 10 分钟的消息(1000 x 60 x 10 = 600000 条消息)。发送后状态大小为 100mb - 150mb。
3)发送后我等了大约一个小时,看到:
最新确认 - 触发时间 = 5000 毫秒(就像我的检查点间隔)
状态大小为:每 5 秒间隔 100 mb - 150 mb。
**我的问题#2:为什么不减少?毕竟我检查了我的作业日志并看到了 600000 条记录:**key** 的 ValueState 被清除(OnTimer 方法成功)并且作业逻辑(参见我的 KeyedProcessFunction 的描述)运行良好**
我想做什么?
1) 在检查点之间设置暂停
2)禁用增量检查点
3)启用异步检查点(在 flink-conf.yml 中)
没有任何变化!!!
**我的问题#3:我该怎么办??因为在工业服务器上的速度是:*1000 万条消息/小时*,并且检查点大小会立即增加。**

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...